Edit on GitHub

backend.lib.search

  1import hashlib
  2import zipfile
  3import secrets
  4import random
  5import json
  6import math
  7import csv
  8import os
  9
 10from pathlib import Path
 11from abc import ABC, abstractmethod
 12
 13from common.config_manager import config
 14from backend.lib.processor import BasicProcessor
 15from common.lib.helpers import strip_tags, dict_search_and_update, remove_nuls, HashCache
 16from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException, MapItemException
 17
 18
 19class Search(BasicProcessor, ABC):
 20	"""
 21	Process search queries from the front-end
 22
 23	This class can be descended from to define a 'search worker', which
 24	collects items from a given data source to create a dataset according to
 25	parameters provided by the user via the web interface.
 26
 27	Each data source defines a search worker that contains code to interface
 28	with e.g. an API or a database server. The search worker also contains a
 29	definition of the parameters that can be configured by the user, via the
 30	`options` attribute and/or the `get_options()` class method.
 31	"""
 32	#: Search worker identifier - should end with 'search' for
 33	#: backwards-compatibility reasons. For example, `instagram-search`.
 34	type = "abstract-search"
 35
 36	#: Amount of workers of this type that can run in parallel. Be careful with
 37	#: this, because values higher than 1 will mean that e.g. API rate limits
 38	#: are easily violated.
 39	max_workers = 1
 40
 41	#: This attribute is only used by search workers that collect data from a
 42	#: local database, to determine the name of the table to collect the data
 43	#: from. If this is `4chan`, for example, items are read from
 44	#: `posts_4chan`.
 45	prefix = ""
 46
 47	# Columns to return in csv
 48	# Mandatory columns: ['thread_id', 'body', 'subject', 'timestamp']
 49	return_cols = ['thread_id', 'body', 'subject', 'timestamp']
 50
 51	import_error_count = 0
 52	import_warning_count = 0
 53
 54	def process(self):
 55		"""
 56		Create 4CAT dataset from a data source
 57
 58		Gets query details, passes them on to the object's search method, and
 59		writes the results to a file. If that all went well, the query and job
 60		are marked as finished.
 61		"""
 62
 63		query_parameters = self.dataset.get_parameters()
 64		results_file = self.dataset.get_results_path()
 65
 66		self.log.info("Querying: %s" % str({k: v for k, v in query_parameters.items() if not self.get_options().get(k, {}).get("sensitive", False)}))
 67
 68		# Execute the relevant query (string-based, random, countryflag-based)
 69		try:
 70			if query_parameters.get("file"):
 71				items = self.import_from_file(query_parameters.get("file"))
 72			else:
 73				items = self.search(query_parameters)
 74		except WorkerInterruptedException:
 75			raise ProcessorInterruptedException("Interrupted while collecting data, trying again later.")
 76
 77		# Write items to file and update the DataBase status to finished
 78		num_items = 0
 79		if items:
 80			self.dataset.update_status("Writing collected data to dataset file")
 81			if self.extension == "csv":
 82				num_items = self.items_to_csv(items, results_file)
 83			elif self.extension == "ndjson":
 84				num_items = self.items_to_ndjson(items, results_file)
 85			elif self.extension == "zip":
 86				num_items = self.items_to_archive(items, results_file)
 87			else:
 88				raise NotImplementedError("Datasource query cannot be saved as %s file" % results_file.suffix)
 89
 90			self.dataset.update_status("Query finished, results are available.")
 91		elif items is not None:
 92			self.dataset.update_status("Query finished, no results found.")
 93
 94		if self.import_warning_count == 0 and self.import_error_count == 0:
 95			self.dataset.finish(num_rows=num_items)
 96		else:
 97			self.dataset.update_status(f"All data imported. {str(self.import_error_count) + ' item(s) had an unexpected format and cannot be used in 4CAT processors. ' if self.import_error_count != 0 else ''}{str(self.import_warning_count) + ' item(s) missing some data fields. ' if self.import_warning_count != 0 else ''}\n\nMissing data is noted in the `missing_fields` column of this dataset's CSV file; see also the dataset log for details.", is_final=True)
 98			self.dataset.finish(num_rows=num_items)
 99
100	def search(self, query):
101		"""
102		Search for items matching the given query
103
104		The real work is done by the get_items() method of the descending
105		class. This method just provides some scaffolding and processing
106		of results via `after_search()`, if it is defined.
107
108		:param dict query:  Query parameters
109		:return:  Iterable of matching items, or None if there are no results.
110		"""
111		items = self.get_items(query)
112
113		if not items:
114			return None
115
116		# search workers may define an 'after_search' hook that is called after
117		# the query is first completed
118		if hasattr(self, "after_search") and callable(self.after_search):
119			items = self.after_search(items)
120
121		return items
122
123	@abstractmethod
124	def get_items(self, query):
125		"""
126		Method to fetch items with for a given query
127
128		To be implemented by descending classes!
129
130		:param dict query:  Query parameters
131		:return Generator:  A generator or iterable that returns items
132		  collected according to the provided parameters.
133		"""
134		pass
135
136	def import_from_file(self, path):
137		"""
138		Import items from an external file
139
140		By default, this reads a file and parses each line as JSON, returning
141		the parsed object as an item. This works for NDJSON files. Data sources
142		that require importing from other or multiple file types can overwrite
143		this method.
144
145		This method has a generic implementation, but in most cases would be
146		redefined in descending classes to account for nuances in incoming data
147		for a given data source.
148
149		The file is considered disposable and deleted after importing.
150
151		:param str path:  Path to read from
152		:return Generator:  Yields all items in the file, item for item.
153		"""
154		if type(path) is not Path:
155			path = Path(path)
156		if not path.exists():
157			return []
158
159		import_warnings = {}
160
161		# Check if processor and dataset can use map_item
162		check_map_item = self.map_item_method_available(dataset=self.dataset)
163		if not check_map_item:
164			self.log.warning(
165				f"Processor {self.type} importing item without map_item method for Dataset {self.dataset.type} - {self.dataset.key}")
166
167		with path.open(encoding="utf-8") as infile:
168			unmapped_items = False
169			for i, line in enumerate(infile):
170				if self.interrupted:
171					raise WorkerInterruptedException()
172
173				try:
174					# remove NUL bytes here because they trip up a lot of other
175					# things
176					# also include import metadata in item
177					item = json.loads(line.replace("\0", ""))
178				except json.JSONDecodeError:
179					warning = (f"An item on line {i:,} of the imported file could not be parsed as JSON - this may "
180							   f"indicate that the file you uploaded was incomplete and you need to try uploading it "
181							   f"again. The item will be ignored.")
182
183					if warning not in import_warnings:
184						import_warnings[warning] = 0
185					import_warnings[warning] += 1
186					continue
187
188
189				new_item = {
190					**item["data"],
191					"__import_meta": {k: v for k, v in item.items() if k != "data"}
192				}
193
194				# Check map item here!
195				if check_map_item:
196					try:
197						mapped_item = self.get_mapped_item(new_item)
198
199						# keep track of items that raised a warning
200						# this means the item could be mapped, but there is
201						# some information the user should take note of
202						warning = mapped_item.get_message()
203						if not warning and mapped_item.get_missing_fields():
204							# usually this would have an explicit warning, but
205							# if not it's still useful to know
206							warning = f"The following fields are missing for this item and will be replaced with a default value: {', '.join(mapped_item.get_missing_fields())}"
207
208						if warning:
209							if warning not in import_warnings:
210								import_warnings[warning] = 0
211							import_warnings[warning] += 1
212							self.import_warning_count += 1
213
214					except MapItemException as e:
215						# NOTE: we still yield the unmappable item; perhaps we need to update a processor's map_item method to account for this new item
216						self.import_error_count += 1
217						self.dataset.warn_unmappable_item(item_count=i, processor=self, error_message=e, warn_admins=unmapped_items is False)
218						unmapped_items = True
219
220				yield new_item
221
222		# warnings were raised about some items
223		# log these, with the number of items each warning applied to
224		if sum(import_warnings.values()) > 0:
225			self.dataset.log("While importing, the following issues were raised:")
226			for warning, num_items in import_warnings.items():
227				self.dataset.log(f"  {warning} (for {num_items:,} item(s))")
228
229		path.unlink()
230		self.dataset.delete_parameter("file")
231
232	def items_to_csv(self, results, filepath):
233		"""
234		Takes a dictionary of results, converts it to a csv, and writes it to the
235		given location. This is mostly a generic dictionary-to-CSV processor but
236		some specific processing is done on the "body" key to strip HTML from it,
237		and a human-readable timestamp is provided next to the UNIX timestamp.
238
239		:param Iterable results:  List of dict rows from data source.
240		:param Path filepath:  Filepath for the resulting csv
241
242		:return int:  Amount of items that were processed
243
244		"""
245		if not filepath:
246			raise ResourceWarning("No result file for query")
247
248		# write the dictionary to a csv
249		if not isinstance(filepath, Path):
250			filepath = Path(filepath)
251
252		# cache hashed author names, so the hashing function (which is
253		# relatively expensive) is not run too often
254		pseudonymise_author = self.parameters.get("pseudonymise", None) == "pseudonymise"
255		anonymise_author = self.parameters.get("pseudonymise", None) == "anonymise"
256
257		# prepare hasher (which we may or may not need)
258		# we use BLAKE2	for its (so far!) resistance against cryptanalysis and
259		# speed, since we will potentially need to calculate a large amount of
260		# hashes
261		salt = secrets.token_bytes(16)
262		hasher = hashlib.blake2b(digest_size=24, salt=salt)
263		hash_cache = HashCache(hasher)
264
265		processed = 0
266		header_written = False
267		with filepath.open("w", encoding="utf-8") as csvfile:
268			# Parsing: remove the HTML tags, but keep the <br> as a newline
269			# Takes around 1.5 times longer
270			for row in results:
271				if self.interrupted:
272					raise ProcessorInterruptedException("Interrupted while writing results to file")
273
274				if not header_written:
275					fieldnames = list(row.keys())
276					fieldnames.append("unix_timestamp")
277					writer = csv.DictWriter(csvfile, fieldnames=fieldnames, lineterminator='\n')
278					writer.writeheader()
279					header_written = True
280
281				processed += 1
282
283				# Create human dates from timestamp
284				from datetime import datetime, timezone
285
286				if "timestamp" in row:
287					# Data sources should have "timestamp" as a unix epoch integer,
288					# but do some conversion if this is not the case.
289					timestamp = row["timestamp"]
290					if not isinstance(timestamp, int):
291						if isinstance(timestamp,
292									  str) and "-" not in timestamp:  # String representation of epoch timestamp
293							timestamp = int(timestamp)
294						elif isinstance(timestamp, str) and "-" in timestamp:  # Date string
295							try:
296								timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S").replace(
297									tzinfo=timezone.utc).timestamp()
298							except ValueError:
299								timestamp = "undefined"
300						else:
301							timestamp = "undefined"
302
303					# Add a human-readable date format as well, if we have a valid timestamp.
304					row["unix_timestamp"] = timestamp
305					if timestamp != "undefined":
306						row["timestamp"] = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
307					else:
308						row["timestamp"] = timestamp
309				else:
310					row["timestamp"] = "undefined"
311
312				# Parse html to text
313				if row["body"]:
314					row["body"] = strip_tags(row["body"])
315
316				# replace author column with salted hash of the author name, if
317				# pseudonymisation is enabled
318				if pseudonymise_author:
319					author_fields = [field for field in row.keys() if field.startswith("author")]
320					for author_field in author_fields:
321						row[author_field] = hash_cache.update_cache(row[author_field])
322
323				# or remove data altogether, if it's anonymisation instead
324				elif anonymise_author:
325					for field in row.keys():
326						if field.startswith("author"):
327							row[field] = "REDACTED"
328
329				row = remove_nuls(row)
330				writer.writerow(row)
331
332		return processed
333
334	def items_to_ndjson(self, items, filepath):
335		"""
336		Save retrieved items as an ndjson file
337
338		NDJSON is a file with one valid JSON value per line, in this case each
339		of these JSON values represents a retrieved item. This is useful if the
340		retrieved data cannot easily be completely stored as a flat CSV file
341		and we want to leave the choice of how to flatten it to the user. Note
342		that no conversion (e.g. html stripping or pseudonymisation) is done
343		here - the items are saved as-is.
344
345		:param Iterator items:  Items to save
346		:param Path filepath:  Location to save results file
347		"""
348		if not filepath:
349			raise ResourceWarning("No valid results path supplied")
350
351		# figure out if we need to filter the data somehow
352		hash_cache = None
353		if self.parameters.get("pseudonymise") == "pseudonymise":
354			# cache hashed author names, so the hashing function (which is
355			# relatively expensive) is not run too often
356			hasher = hashlib.blake2b(digest_size=24)
357			hasher.update(str(config.get('ANONYMISATION_SALT')).encode("utf-8"))
358			hash_cache = HashCache(hasher)
359
360		processed = 0
361		with filepath.open("w", encoding="utf-8", newline="") as outfile:
362			for item in items:
363				if self.interrupted:
364					raise ProcessorInterruptedException("Interrupted while writing results to file")
365
366				# if pseudo/anonymising, filter data recursively
367				if self.parameters.get("pseudonymise") == "pseudonymise":
368					item = dict_search_and_update(item, ["author*"], hash_cache.update_cache)
369				elif self.parameters.get("anonymise") == "anonymise":
370					item = dict_search_and_update(item, ["author*"], lambda v: "REDACTED")
371
372				outfile.write(json.dumps(item) + "\n")
373				processed += 1
374
375		return processed
376
377	def items_to_archive(self, items, filepath):
378		"""
379		Save retrieved items as an archive
380
381		Assumes that items is an iterable with one item, a Path object
382		referring to a folder containing files to be archived. The folder will
383		be removed afterwards.
384
385		:param items:
386		:param filepath:  Where to store the archive
387		:return int:  Number of items
388		"""
389		num_items = len(os.listdir(items))
390		self.write_archive_and_finish(items, None, zipfile.ZIP_STORED, False)
391		return num_items
392
393
394class SearchWithScope(Search, ABC):
395	"""
396	Search class with more complex search pathways
397
398	Some datasources may afford more complex search modes besides simply
399	returning all items matching a given set of parameters. In particular,
400	they may allow for expanding the search scope to the thread in which a
401	given matching item occurs. This subclass allows for the following
402	additional search modes:
403
404	- All items in a thread containing a matching item
405	- All items in a thread containing at least x% matching items
406	"""
407
408	def search(self, query):
409		"""
410		Complex search
411
412		Allows for two separate search pathways, one of which is chosen based
413		on the search query. Additionally, extra items are added to the results
414		if a wider search scope is requested.
415
416		:param dict query:  Query parameters
417		:return:  Matching items, as iterable, or None if no items match.
418		"""
419		mode = self.get_search_mode(query)
420
421		if mode == "simple":
422			items = self.get_items_simple(query)
423		else:
424			items = self.get_items_complex(query)
425
426		if not items:
427			return None
428
429		# handle the various search scope options after retrieving initial item
430		# list
431		if query.get("search_scope", None) == "dense-threads":
432			# dense threads - all items in all threads in which the requested
433			# proportion of items matches
434			# first, get amount of items for all threads in which matching
435			# items occur and that are long enough
436			thread_ids = tuple([item["thread_id"] for item in items])
437			self.dataset.update_status("Retrieving thread metadata for %i threads" % len(thread_ids))
438			try:
439				min_length = int(query.get("scope_length", 30))
440			except ValueError:
441				min_length = 30
442
443			thread_sizes = self.get_thread_sizes(thread_ids, min_length)
444
445			# determine how many matching items occur per thread in the initial
446			# data set
447			items_per_thread = {}
448			for item in items:
449				if item["thread_id"] not in items_per_thread:
450					items_per_thread[item["thread_id"]] = 0
451
452				items_per_thread[item["thread_id"]] += 1
453
454			# keep all thread IDs where that amount is more than the requested
455			# density
456			qualifying_thread_ids = set()
457
458			self.dataset.update_status("Filtering dense threads")
459			try:
460				percentage = int(query.get("scope_density")) / 100
461			except (ValueError, TypeError):
462				percentage = 0.15
463
464			for thread_id in items_per_thread:
465				if thread_id not in thread_sizes:
466					# thread not long enough
467					continue
468				required_items = math.ceil(percentage * thread_sizes[thread_id])
469				if items_per_thread[thread_id] >= required_items:
470					qualifying_thread_ids.add(thread_id)
471
472			if len(qualifying_thread_ids) > 25000:
473				self.dataset.update_status(
474					"Too many matching threads (%i) to get full thread data for, aborting. Please try again with a narrower query." % len(
475						qualifying_thread_ids))
476				return None
477
478			if qualifying_thread_ids:
479				self.dataset.update_status("Fetching all items in %i threads" % len(qualifying_thread_ids))
480				items = self.fetch_threads(tuple(qualifying_thread_ids))
481			else:
482				self.dataset.update_status("No threads matched the full thread search parameters.")
483				return None
484
485		elif query.get("search_scope", None) == "full-threads":
486			# get all items in threads containing at least one matching item
487			thread_ids = tuple(set([item["thread_id"] for item in items]))
488			if len(thread_ids) > 25000:
489				self.dataset.update_status(
490					"Too many matching threads (%i) to get full thread data for, aborting. Please try again with a narrower query." % len(
491						thread_ids))
492				return None
493
494			self.dataset.update_status("Retrieving all items from %i threads" % len(thread_ids))
495			items = self.fetch_threads(thread_ids)
496
497		elif mode == "complex":
498			# create a random sample subset of all items if requested. for
499			# complex queries, this can usually only be done at this point;
500			# for simple queries, this is handled in get_items_simple
501			if query.get("search_scope", None) == "random-sample":
502				try:
503					self.dataset.update_status("Creating random sample")
504					sample_size = int(query.get("sample_size", 5000))
505					items = list(items)
506					random.shuffle(items)
507					return items[0:sample_size]
508				except ValueError:
509					pass
510
511		# search workers may define an 'after_search' hook that is called after
512		# the query is first completed
513		if hasattr(self, "after_search") and callable(self.after_search):
514			items = self.after_search(items)
515
516		return items
517
518	def get_items(self, query):
519		"""
520		Not available in this subclass
521		"""
522		raise NotImplementedError("Cannot use get_items() directly in SearchWithScope")
523
524	def get_search_mode(self, query):
525		"""
526		Determine what search mode to use
527
528		Can be overridden by child classes!
529
530		:param dict query:  Query parameters
531		:return str:  'simple' or 'complex'
532		"""
533		if query.get("body_match", None) or query.get("subject_match", None):
534			mode = "complex"
535		else:
536			mode = "simple"
537
538		return mode
539
540	@abstractmethod
541	def get_items_simple(self, query):
542		"""
543		Get items via the simple pathway
544
545		If `get_search_mode()` returned `"simple"`, this method is used to
546		retrieve items. What this method does exactly is up to the descending
547		class.
548
549		:param dict query:  Query parameters
550		:return Iterable:  Items that match the parameters
551		"""
552		pass
553
554	@abstractmethod
555	def get_items_complex(self, query):
556		"""
557		Get items via the complex pathway
558
559		If `get_search_mode()` returned `"complex"`, this method is used to
560		retrieve items. What this method does exactly is up to the descending
561		class.
562
563		:param dict query:  Query parameters
564		:return Iterable:  Items that match the parameters
565		"""
566		pass
567
568	@abstractmethod
569	def fetch_posts(self, post_ids, where=None, replacements=None):
570		"""
571		Get items for given IDs
572
573		:param Iterable post_ids:  Post IDs to e.g. match against a database
574		:param where:  Deprecated, do not use
575		:param replacements:  Deprecated, do not use
576		:return Iterable[dict]:  Post objects
577		"""
578		pass
579
580	@abstractmethod
581	def fetch_threads(self, thread_ids):
582		"""
583		Get items for given thread IDs
584
585		:param Iterable thread_ids:  Thread IDs to e.g. match against a database
586		:return Iterable[dict]:  Post objects
587		"""
588		pass
589
590	@abstractmethod
591	def get_thread_sizes(self, thread_ids, min_length):
592		"""
593		Get thread lengths for all threads
594
595		:param tuple thread_ids:  List of thread IDs to fetch lengths for
596		:param int min_length:  Min length for a thread to be included in the
597		results
598		:return dict:  Threads sizes, with thread IDs as keys
599		"""
600		pass
class SearchWithScope(Search, abc.ABC):
395class SearchWithScope(Search, ABC):
396	"""
397	Search class with more complex search pathways
398
399	Some datasources may afford more complex search modes besides simply
400	returning all items matching a given set of parameters. In particular,
401	they may allow for expanding the search scope to the thread in which a
402	given matching item occurs. This subclass allows for the following
403	additional search modes:
404
405	- All items in a thread containing a matching item
406	- All items in a thread containing at least x% matching items
407	"""
408
409	def search(self, query):
410		"""
411		Complex search
412
413		Allows for two separate search pathways, one of which is chosen based
414		on the search query. Additionally, extra items are added to the results
415		if a wider search scope is requested.
416
417		:param dict query:  Query parameters
418		:return:  Matching items, as iterable, or None if no items match.
419		"""
420		mode = self.get_search_mode(query)
421
422		if mode == "simple":
423			items = self.get_items_simple(query)
424		else:
425			items = self.get_items_complex(query)
426
427		if not items:
428			return None
429
430		# handle the various search scope options after retrieving initial item
431		# list
432		if query.get("search_scope", None) == "dense-threads":
433			# dense threads - all items in all threads in which the requested
434			# proportion of items matches
435			# first, get amount of items for all threads in which matching
436			# items occur and that are long enough
437			thread_ids = tuple([item["thread_id"] for item in items])
438			self.dataset.update_status("Retrieving thread metadata for %i threads" % len(thread_ids))
439			try:
440				min_length = int(query.get("scope_length", 30))
441			except ValueError:
442				min_length = 30
443
444			thread_sizes = self.get_thread_sizes(thread_ids, min_length)
445
446			# determine how many matching items occur per thread in the initial
447			# data set
448			items_per_thread = {}
449			for item in items:
450				if item["thread_id"] not in items_per_thread:
451					items_per_thread[item["thread_id"]] = 0
452
453				items_per_thread[item["thread_id"]] += 1
454
455			# keep all thread IDs where that amount is more than the requested
456			# density
457			qualifying_thread_ids = set()
458
459			self.dataset.update_status("Filtering dense threads")
460			try:
461				percentage = int(query.get("scope_density")) / 100
462			except (ValueError, TypeError):
463				percentage = 0.15
464
465			for thread_id in items_per_thread:
466				if thread_id not in thread_sizes:
467					# thread not long enough
468					continue
469				required_items = math.ceil(percentage * thread_sizes[thread_id])
470				if items_per_thread[thread_id] >= required_items:
471					qualifying_thread_ids.add(thread_id)
472
473			if len(qualifying_thread_ids) > 25000:
474				self.dataset.update_status(
475					"Too many matching threads (%i) to get full thread data for, aborting. Please try again with a narrower query." % len(
476						qualifying_thread_ids))
477				return None
478
479			if qualifying_thread_ids:
480				self.dataset.update_status("Fetching all items in %i threads" % len(qualifying_thread_ids))
481				items = self.fetch_threads(tuple(qualifying_thread_ids))
482			else:
483				self.dataset.update_status("No threads matched the full thread search parameters.")
484				return None
485
486		elif query.get("search_scope", None) == "full-threads":
487			# get all items in threads containing at least one matching item
488			thread_ids = tuple(set([item["thread_id"] for item in items]))
489			if len(thread_ids) > 25000:
490				self.dataset.update_status(
491					"Too many matching threads (%i) to get full thread data for, aborting. Please try again with a narrower query." % len(
492						thread_ids))
493				return None
494
495			self.dataset.update_status("Retrieving all items from %i threads" % len(thread_ids))
496			items = self.fetch_threads(thread_ids)
497
498		elif mode == "complex":
499			# create a random sample subset of all items if requested. for
500			# complex queries, this can usually only be done at this point;
501			# for simple queries, this is handled in get_items_simple
502			if query.get("search_scope", None) == "random-sample":
503				try:
504					self.dataset.update_status("Creating random sample")
505					sample_size = int(query.get("sample_size", 5000))
506					items = list(items)
507					random.shuffle(items)
508					return items[0:sample_size]
509				except ValueError:
510					pass
511
512		# search workers may define an 'after_search' hook that is called after
513		# the query is first completed
514		if hasattr(self, "after_search") and callable(self.after_search):
515			items = self.after_search(items)
516
517		return items
518
519	def get_items(self, query):
520		"""
521		Not available in this subclass
522		"""
523		raise NotImplementedError("Cannot use get_items() directly in SearchWithScope")
524
525	def get_search_mode(self, query):
526		"""
527		Determine what search mode to use
528
529		Can be overridden by child classes!
530
531		:param dict query:  Query parameters
532		:return str:  'simple' or 'complex'
533		"""
534		if query.get("body_match", None) or query.get("subject_match", None):
535			mode = "complex"
536		else:
537			mode = "simple"
538
539		return mode
540
541	@abstractmethod
542	def get_items_simple(self, query):
543		"""
544		Get items via the simple pathway
545
546		If `get_search_mode()` returned `"simple"`, this method is used to
547		retrieve items. What this method does exactly is up to the descending
548		class.
549
550		:param dict query:  Query parameters
551		:return Iterable:  Items that match the parameters
552		"""
553		pass
554
555	@abstractmethod
556	def get_items_complex(self, query):
557		"""
558		Get items via the complex pathway
559
560		If `get_search_mode()` returned `"complex"`, this method is used to
561		retrieve items. What this method does exactly is up to the descending
562		class.
563
564		:param dict query:  Query parameters
565		:return Iterable:  Items that match the parameters
566		"""
567		pass
568
569	@abstractmethod
570	def fetch_posts(self, post_ids, where=None, replacements=None):
571		"""
572		Get items for given IDs
573
574		:param Iterable post_ids:  Post IDs to e.g. match against a database
575		:param where:  Deprecated, do not use
576		:param replacements:  Deprecated, do not use
577		:return Iterable[dict]:  Post objects
578		"""
579		pass
580
581	@abstractmethod
582	def fetch_threads(self, thread_ids):
583		"""
584		Get items for given thread IDs
585
586		:param Iterable thread_ids:  Thread IDs to e.g. match against a database
587		:return Iterable[dict]:  Post objects
588		"""
589		pass
590
591	@abstractmethod
592	def get_thread_sizes(self, thread_ids, min_length):
593		"""
594		Get thread lengths for all threads
595
596		:param tuple thread_ids:  List of thread IDs to fetch lengths for
597		:param int min_length:  Min length for a thread to be included in the
598		results
599		:return dict:  Threads sizes, with thread IDs as keys
600		"""
601		pass

Search class with more complex search pathways

Some datasources may afford more complex search modes besides simply returning all items matching a given set of parameters. In particular, they may allow for expanding the search scope to the thread in which a given matching item occurs. This subclass allows for the following additional search modes:

  • All items in a thread containing a matching item
  • All items in a thread containing at least x% matching items
def search(self, query):
409	def search(self, query):
410		"""
411		Complex search
412
413		Allows for two separate search pathways, one of which is chosen based
414		on the search query. Additionally, extra items are added to the results
415		if a wider search scope is requested.
416
417		:param dict query:  Query parameters
418		:return:  Matching items, as iterable, or None if no items match.
419		"""
420		mode = self.get_search_mode(query)
421
422		if mode == "simple":
423			items = self.get_items_simple(query)
424		else:
425			items = self.get_items_complex(query)
426
427		if not items:
428			return None
429
430		# handle the various search scope options after retrieving initial item
431		# list
432		if query.get("search_scope", None) == "dense-threads":
433			# dense threads - all items in all threads in which the requested
434			# proportion of items matches
435			# first, get amount of items for all threads in which matching
436			# items occur and that are long enough
437			thread_ids = tuple([item["thread_id"] for item in items])
438			self.dataset.update_status("Retrieving thread metadata for %i threads" % len(thread_ids))
439			try:
440				min_length = int(query.get("scope_length", 30))
441			except ValueError:
442				min_length = 30
443
444			thread_sizes = self.get_thread_sizes(thread_ids, min_length)
445
446			# determine how many matching items occur per thread in the initial
447			# data set
448			items_per_thread = {}
449			for item in items:
450				if item["thread_id"] not in items_per_thread:
451					items_per_thread[item["thread_id"]] = 0
452
453				items_per_thread[item["thread_id"]] += 1
454
455			# keep all thread IDs where that amount is more than the requested
456			# density
457			qualifying_thread_ids = set()
458
459			self.dataset.update_status("Filtering dense threads")
460			try:
461				percentage = int(query.get("scope_density")) / 100
462			except (ValueError, TypeError):
463				percentage = 0.15
464
465			for thread_id in items_per_thread:
466				if thread_id not in thread_sizes:
467					# thread not long enough
468					continue
469				required_items = math.ceil(percentage * thread_sizes[thread_id])
470				if items_per_thread[thread_id] >= required_items:
471					qualifying_thread_ids.add(thread_id)
472
473			if len(qualifying_thread_ids) > 25000:
474				self.dataset.update_status(
475					"Too many matching threads (%i) to get full thread data for, aborting. Please try again with a narrower query." % len(
476						qualifying_thread_ids))
477				return None
478
479			if qualifying_thread_ids:
480				self.dataset.update_status("Fetching all items in %i threads" % len(qualifying_thread_ids))
481				items = self.fetch_threads(tuple(qualifying_thread_ids))
482			else:
483				self.dataset.update_status("No threads matched the full thread search parameters.")
484				return None
485
486		elif query.get("search_scope", None) == "full-threads":
487			# get all items in threads containing at least one matching item
488			thread_ids = tuple(set([item["thread_id"] for item in items]))
489			if len(thread_ids) > 25000:
490				self.dataset.update_status(
491					"Too many matching threads (%i) to get full thread data for, aborting. Please try again with a narrower query." % len(
492						thread_ids))
493				return None
494
495			self.dataset.update_status("Retrieving all items from %i threads" % len(thread_ids))
496			items = self.fetch_threads(thread_ids)
497
498		elif mode == "complex":
499			# create a random sample subset of all items if requested. for
500			# complex queries, this can usually only be done at this point;
501			# for simple queries, this is handled in get_items_simple
502			if query.get("search_scope", None) == "random-sample":
503				try:
504					self.dataset.update_status("Creating random sample")
505					sample_size = int(query.get("sample_size", 5000))
506					items = list(items)
507					random.shuffle(items)
508					return items[0:sample_size]
509				except ValueError:
510					pass
511
512		# search workers may define an 'after_search' hook that is called after
513		# the query is first completed
514		if hasattr(self, "after_search") and callable(self.after_search):
515			items = self.after_search(items)
516
517		return items

Complex search

Allows for two separate search pathways, one of which is chosen based on the search query. Additionally, extra items are added to the results if a wider search scope is requested.

Parameters
  • dict query: Query parameters
Returns

Matching items, as iterable, or None if no items match.

def get_items(self, query):
519	def get_items(self, query):
520		"""
521		Not available in this subclass
522		"""
523		raise NotImplementedError("Cannot use get_items() directly in SearchWithScope")

Not available in this subclass

def get_search_mode(self, query):
525	def get_search_mode(self, query):
526		"""
527		Determine what search mode to use
528
529		Can be overridden by child classes!
530
531		:param dict query:  Query parameters
532		:return str:  'simple' or 'complex'
533		"""
534		if query.get("body_match", None) or query.get("subject_match", None):
535			mode = "complex"
536		else:
537			mode = "simple"
538
539		return mode

Determine what search mode to use

Can be overridden by child classes!

Parameters
  • dict query: Query parameters
Returns

'simple' or 'complex'

@abstractmethod
def get_items_simple(self, query):
541	@abstractmethod
542	def get_items_simple(self, query):
543		"""
544		Get items via the simple pathway
545
546		If `get_search_mode()` returned `"simple"`, this method is used to
547		retrieve items. What this method does exactly is up to the descending
548		class.
549
550		:param dict query:  Query parameters
551		:return Iterable:  Items that match the parameters
552		"""
553		pass

Get items via the simple pathway

If get_search_mode() returned "simple", this method is used to retrieve items. What this method does exactly is up to the descending class.

Parameters
  • dict query: Query parameters
Returns

Items that match the parameters

@abstractmethod
def get_items_complex(self, query):
555	@abstractmethod
556	def get_items_complex(self, query):
557		"""
558		Get items via the complex pathway
559
560		If `get_search_mode()` returned `"complex"`, this method is used to
561		retrieve items. What this method does exactly is up to the descending
562		class.
563
564		:param dict query:  Query parameters
565		:return Iterable:  Items that match the parameters
566		"""
567		pass

Get items via the complex pathway

If get_search_mode() returned "complex", this method is used to retrieve items. What this method does exactly is up to the descending class.

Parameters
  • dict query: Query parameters
Returns

Items that match the parameters

@abstractmethod
def fetch_posts(self, post_ids, where=None, replacements=None):
569	@abstractmethod
570	def fetch_posts(self, post_ids, where=None, replacements=None):
571		"""
572		Get items for given IDs
573
574		:param Iterable post_ids:  Post IDs to e.g. match against a database
575		:param where:  Deprecated, do not use
576		:param replacements:  Deprecated, do not use
577		:return Iterable[dict]:  Post objects
578		"""
579		pass

Get items for given IDs

Parameters
  • Iterable post_ids: Post IDs to e.g. match against a database
  • where: Deprecated, do not use
  • replacements: Deprecated, do not use
Returns

Post objects

@abstractmethod
def fetch_threads(self, thread_ids):
581	@abstractmethod
582	def fetch_threads(self, thread_ids):
583		"""
584		Get items for given thread IDs
585
586		:param Iterable thread_ids:  Thread IDs to e.g. match against a database
587		:return Iterable[dict]:  Post objects
588		"""
589		pass

Get items for given thread IDs

Parameters
  • Iterable thread_ids: Thread IDs to e.g. match against a database
Returns

Post objects

@abstractmethod
def get_thread_sizes(self, thread_ids, min_length):
591	@abstractmethod
592	def get_thread_sizes(self, thread_ids, min_length):
593		"""
594		Get thread lengths for all threads
595
596		:param tuple thread_ids:  List of thread IDs to fetch lengths for
597		:param int min_length:  Min length for a thread to be included in the
598		results
599		:return dict:  Threads sizes, with thread IDs as keys
600		"""
601		pass

Get thread lengths for all threads

Parameters
  • tuple thread_ids: List of thread IDs to fetch lengths for
  • int min_length: Min length for a thread to be included in the results
Returns

Threads sizes, with thread IDs as keys