backend.lib.search
1import hashlib 2import zipfile 3import secrets 4import random 5import json 6import math 7import csv 8import os 9 10from pathlib import Path 11from abc import ABC, abstractmethod 12 13from common.config_manager import config 14from backend.lib.processor import BasicProcessor 15from common.lib.helpers import strip_tags, dict_search_and_update, remove_nuls, HashCache 16from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException, MapItemException 17 18 19class Search(BasicProcessor, ABC): 20 """ 21 Process search queries from the front-end 22 23 This class can be descended from to define a 'search worker', which 24 collects items from a given data source to create a dataset according to 25 parameters provided by the user via the web interface. 26 27 Each data source defines a search worker that contains code to interface 28 with e.g. an API or a database server. The search worker also contains a 29 definition of the parameters that can be configured by the user, via the 30 `options` attribute and/or the `get_options()` class method. 31 """ 32 #: Search worker identifier - should end with 'search' for 33 #: backwards-compatibility reasons. For example, `instagram-search`. 34 type = "abstract-search" 35 36 #: Amount of workers of this type that can run in parallel. Be careful with 37 #: this, because values higher than 1 will mean that e.g. API rate limits 38 #: are easily violated. 39 max_workers = 1 40 41 #: This attribute is only used by search workers that collect data from a 42 #: local database, to determine the name of the table to collect the data 43 #: from. If this is `4chan`, for example, items are read from 44 #: `posts_4chan`. 45 prefix = "" 46 47 # Columns to return in csv 48 # Mandatory columns: ['thread_id', 'body', 'subject', 'timestamp'] 49 return_cols = ['thread_id', 'body', 'subject', 'timestamp'] 50 51 import_error_count = 0 52 import_warning_count = 0 53 54 def process(self): 55 """ 56 Create 4CAT dataset from a data source 57 58 Gets query details, passes them on to the object's search method, and 59 writes the results to a file. If that all went well, the query and job 60 are marked as finished. 61 """ 62 63 query_parameters = self.dataset.get_parameters() 64 results_file = self.dataset.get_results_path() 65 66 self.log.info("Querying: %s" % str({k: v for k, v in query_parameters.items() if not self.get_options().get(k, {}).get("sensitive", False)})) 67 68 # Execute the relevant query (string-based, random, countryflag-based) 69 try: 70 if query_parameters.get("file"): 71 items = self.import_from_file(query_parameters.get("file")) 72 else: 73 items = self.search(query_parameters) 74 except WorkerInterruptedException: 75 raise ProcessorInterruptedException("Interrupted while collecting data, trying again later.") 76 77 # Write items to file and update the DataBase status to finished 78 num_items = 0 79 if items: 80 self.dataset.update_status("Writing collected data to dataset file") 81 if self.extension == "csv": 82 num_items = self.items_to_csv(items, results_file) 83 elif self.extension == "ndjson": 84 num_items = self.items_to_ndjson(items, results_file) 85 elif self.extension == "zip": 86 num_items = self.items_to_archive(items, results_file) 87 else: 88 raise NotImplementedError("Datasource query cannot be saved as %s file" % results_file.suffix) 89 90 self.dataset.update_status("Query finished, results are available.") 91 elif items is not None: 92 self.dataset.update_status("Query finished, no results found.") 93 94 if self.import_warning_count == 0 and self.import_error_count == 0: 95 self.dataset.finish(num_rows=num_items) 96 else: 97 self.dataset.update_status(f"All data imported. {str(self.import_error_count) + ' item(s) had an unexpected format and cannot be used in 4CAT processors. ' if self.import_error_count != 0 else ''}{str(self.import_warning_count) + ' item(s) missing some data fields. ' if self.import_warning_count != 0 else ''}\n\nMissing data is noted in the `missing_fields` column of this dataset's CSV file; see also the dataset log for details.", is_final=True) 98 self.dataset.finish(num_rows=num_items) 99 100 def search(self, query): 101 """ 102 Search for items matching the given query 103 104 The real work is done by the get_items() method of the descending 105 class. This method just provides some scaffolding and processing 106 of results via `after_search()`, if it is defined. 107 108 :param dict query: Query parameters 109 :return: Iterable of matching items, or None if there are no results. 110 """ 111 items = self.get_items(query) 112 113 if not items: 114 return None 115 116 # search workers may define an 'after_search' hook that is called after 117 # the query is first completed 118 if hasattr(self, "after_search") and callable(self.after_search): 119 items = self.after_search(items) 120 121 return items 122 123 @abstractmethod 124 def get_items(self, query): 125 """ 126 Method to fetch items with for a given query 127 128 To be implemented by descending classes! 129 130 :param dict query: Query parameters 131 :return Generator: A generator or iterable that returns items 132 collected according to the provided parameters. 133 """ 134 pass 135 136 def import_from_file(self, path): 137 """ 138 Import items from an external file 139 140 By default, this reads a file and parses each line as JSON, returning 141 the parsed object as an item. This works for NDJSON files. Data sources 142 that require importing from other or multiple file types can overwrite 143 this method. 144 145 This method has a generic implementation, but in most cases would be 146 redefined in descending classes to account for nuances in incoming data 147 for a given data source. 148 149 The file is considered disposable and deleted after importing. 150 151 :param str path: Path to read from 152 :return Generator: Yields all items in the file, item for item. 153 """ 154 if type(path) is not Path: 155 path = Path(path) 156 if not path.exists(): 157 return [] 158 159 import_warnings = {} 160 161 # Check if processor and dataset can use map_item 162 check_map_item = self.map_item_method_available(dataset=self.dataset) 163 if not check_map_item: 164 self.log.warning( 165 f"Processor {self.type} importing item without map_item method for Dataset {self.dataset.type} - {self.dataset.key}") 166 167 with path.open(encoding="utf-8") as infile: 168 unmapped_items = False 169 for i, line in enumerate(infile): 170 if self.interrupted: 171 raise WorkerInterruptedException() 172 173 try: 174 # remove NUL bytes here because they trip up a lot of other 175 # things 176 # also include import metadata in item 177 item = json.loads(line.replace("\0", "")) 178 except json.JSONDecodeError: 179 warning = (f"An item on line {i:,} of the imported file could not be parsed as JSON - this may " 180 f"indicate that the file you uploaded was incomplete and you need to try uploading it " 181 f"again. The item will be ignored.") 182 183 if warning not in import_warnings: 184 import_warnings[warning] = 0 185 import_warnings[warning] += 1 186 continue 187 188 189 new_item = { 190 **item["data"], 191 "__import_meta": {k: v for k, v in item.items() if k != "data"} 192 } 193 194 # Check map item here! 195 if check_map_item: 196 try: 197 mapped_item = self.get_mapped_item(new_item) 198 199 # keep track of items that raised a warning 200 # this means the item could be mapped, but there is 201 # some information the user should take note of 202 warning = mapped_item.get_message() 203 if not warning and mapped_item.get_missing_fields(): 204 # usually this would have an explicit warning, but 205 # if not it's still useful to know 206 warning = f"The following fields are missing for this item and will be replaced with a default value: {', '.join(mapped_item.get_missing_fields())}" 207 208 if warning: 209 if warning not in import_warnings: 210 import_warnings[warning] = 0 211 import_warnings[warning] += 1 212 self.import_warning_count += 1 213 214 except MapItemException as e: 215 # NOTE: we still yield the unmappable item; perhaps we need to update a processor's map_item method to account for this new item 216 self.import_error_count += 1 217 self.dataset.warn_unmappable_item(item_count=i, processor=self, error_message=e, warn_admins=unmapped_items is False) 218 unmapped_items = True 219 220 yield new_item 221 222 # warnings were raised about some items 223 # log these, with the number of items each warning applied to 224 if sum(import_warnings.values()) > 0: 225 self.dataset.log("While importing, the following issues were raised:") 226 for warning, num_items in import_warnings.items(): 227 self.dataset.log(f" {warning} (for {num_items:,} item(s))") 228 229 path.unlink() 230 self.dataset.delete_parameter("file") 231 232 def items_to_csv(self, results, filepath): 233 """ 234 Takes a dictionary of results, converts it to a csv, and writes it to the 235 given location. This is mostly a generic dictionary-to-CSV processor but 236 some specific processing is done on the "body" key to strip HTML from it, 237 and a human-readable timestamp is provided next to the UNIX timestamp. 238 239 :param Iterable results: List of dict rows from data source. 240 :param Path filepath: Filepath for the resulting csv 241 242 :return int: Amount of items that were processed 243 244 """ 245 if not filepath: 246 raise ResourceWarning("No result file for query") 247 248 # write the dictionary to a csv 249 if not isinstance(filepath, Path): 250 filepath = Path(filepath) 251 252 # cache hashed author names, so the hashing function (which is 253 # relatively expensive) is not run too often 254 pseudonymise_author = self.parameters.get("pseudonymise", None) == "pseudonymise" 255 anonymise_author = self.parameters.get("pseudonymise", None) == "anonymise" 256 257 # prepare hasher (which we may or may not need) 258 # we use BLAKE2 for its (so far!) resistance against cryptanalysis and 259 # speed, since we will potentially need to calculate a large amount of 260 # hashes 261 salt = secrets.token_bytes(16) 262 hasher = hashlib.blake2b(digest_size=24, salt=salt) 263 hash_cache = HashCache(hasher) 264 265 processed = 0 266 header_written = False 267 with filepath.open("w", encoding="utf-8") as csvfile: 268 # Parsing: remove the HTML tags, but keep the <br> as a newline 269 # Takes around 1.5 times longer 270 for row in results: 271 if self.interrupted: 272 raise ProcessorInterruptedException("Interrupted while writing results to file") 273 274 if not header_written: 275 fieldnames = list(row.keys()) 276 fieldnames.append("unix_timestamp") 277 writer = csv.DictWriter(csvfile, fieldnames=fieldnames, lineterminator='\n') 278 writer.writeheader() 279 header_written = True 280 281 processed += 1 282 283 # Create human dates from timestamp 284 from datetime import datetime, timezone 285 286 if "timestamp" in row: 287 # Data sources should have "timestamp" as a unix epoch integer, 288 # but do some conversion if this is not the case. 289 timestamp = row["timestamp"] 290 if not isinstance(timestamp, int): 291 if isinstance(timestamp, 292 str) and "-" not in timestamp: # String representation of epoch timestamp 293 timestamp = int(timestamp) 294 elif isinstance(timestamp, str) and "-" in timestamp: # Date string 295 try: 296 timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S").replace( 297 tzinfo=timezone.utc).timestamp() 298 except ValueError: 299 timestamp = "undefined" 300 else: 301 timestamp = "undefined" 302 303 # Add a human-readable date format as well, if we have a valid timestamp. 304 row["unix_timestamp"] = timestamp 305 if timestamp != "undefined": 306 row["timestamp"] = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') 307 else: 308 row["timestamp"] = timestamp 309 else: 310 row["timestamp"] = "undefined" 311 312 # Parse html to text 313 if row["body"]: 314 row["body"] = strip_tags(row["body"]) 315 316 # replace author column with salted hash of the author name, if 317 # pseudonymisation is enabled 318 if pseudonymise_author: 319 author_fields = [field for field in row.keys() if field.startswith("author")] 320 for author_field in author_fields: 321 row[author_field] = hash_cache.update_cache(row[author_field]) 322 323 # or remove data altogether, if it's anonymisation instead 324 elif anonymise_author: 325 for field in row.keys(): 326 if field.startswith("author"): 327 row[field] = "REDACTED" 328 329 row = remove_nuls(row) 330 writer.writerow(row) 331 332 return processed 333 334 def items_to_ndjson(self, items, filepath): 335 """ 336 Save retrieved items as an ndjson file 337 338 NDJSON is a file with one valid JSON value per line, in this case each 339 of these JSON values represents a retrieved item. This is useful if the 340 retrieved data cannot easily be completely stored as a flat CSV file 341 and we want to leave the choice of how to flatten it to the user. Note 342 that no conversion (e.g. html stripping or pseudonymisation) is done 343 here - the items are saved as-is. 344 345 :param Iterator items: Items to save 346 :param Path filepath: Location to save results file 347 """ 348 if not filepath: 349 raise ResourceWarning("No valid results path supplied") 350 351 # figure out if we need to filter the data somehow 352 hash_cache = None 353 if self.parameters.get("pseudonymise") == "pseudonymise": 354 # cache hashed author names, so the hashing function (which is 355 # relatively expensive) is not run too often 356 hasher = hashlib.blake2b(digest_size=24) 357 hasher.update(str(config.get('ANONYMISATION_SALT')).encode("utf-8")) 358 hash_cache = HashCache(hasher) 359 360 processed = 0 361 with filepath.open("w", encoding="utf-8", newline="") as outfile: 362 for item in items: 363 if self.interrupted: 364 raise ProcessorInterruptedException("Interrupted while writing results to file") 365 366 # if pseudo/anonymising, filter data recursively 367 if self.parameters.get("pseudonymise") == "pseudonymise": 368 item = dict_search_and_update(item, ["author*"], hash_cache.update_cache) 369 elif self.parameters.get("anonymise") == "anonymise": 370 item = dict_search_and_update(item, ["author*"], lambda v: "REDACTED") 371 372 outfile.write(json.dumps(item) + "\n") 373 processed += 1 374 375 return processed 376 377 def items_to_archive(self, items, filepath): 378 """ 379 Save retrieved items as an archive 380 381 Assumes that items is an iterable with one item, a Path object 382 referring to a folder containing files to be archived. The folder will 383 be removed afterwards. 384 385 :param items: 386 :param filepath: Where to store the archive 387 :return int: Number of items 388 """ 389 num_items = len(os.listdir(items)) 390 self.write_archive_and_finish(items, None, zipfile.ZIP_STORED, False) 391 return num_items 392 393 394class SearchWithScope(Search, ABC): 395 """ 396 Search class with more complex search pathways 397 398 Some datasources may afford more complex search modes besides simply 399 returning all items matching a given set of parameters. In particular, 400 they may allow for expanding the search scope to the thread in which a 401 given matching item occurs. This subclass allows for the following 402 additional search modes: 403 404 - All items in a thread containing a matching item 405 - All items in a thread containing at least x% matching items 406 """ 407 408 def search(self, query): 409 """ 410 Complex search 411 412 Allows for two separate search pathways, one of which is chosen based 413 on the search query. Additionally, extra items are added to the results 414 if a wider search scope is requested. 415 416 :param dict query: Query parameters 417 :return: Matching items, as iterable, or None if no items match. 418 """ 419 mode = self.get_search_mode(query) 420 421 if mode == "simple": 422 items = self.get_items_simple(query) 423 else: 424 items = self.get_items_complex(query) 425 426 if not items: 427 return None 428 429 # handle the various search scope options after retrieving initial item 430 # list 431 if query.get("search_scope", None) == "dense-threads": 432 # dense threads - all items in all threads in which the requested 433 # proportion of items matches 434 # first, get amount of items for all threads in which matching 435 # items occur and that are long enough 436 thread_ids = tuple([item["thread_id"] for item in items]) 437 self.dataset.update_status("Retrieving thread metadata for %i threads" % len(thread_ids)) 438 try: 439 min_length = int(query.get("scope_length", 30)) 440 except ValueError: 441 min_length = 30 442 443 thread_sizes = self.get_thread_sizes(thread_ids, min_length) 444 445 # determine how many matching items occur per thread in the initial 446 # data set 447 items_per_thread = {} 448 for item in items: 449 if item["thread_id"] not in items_per_thread: 450 items_per_thread[item["thread_id"]] = 0 451 452 items_per_thread[item["thread_id"]] += 1 453 454 # keep all thread IDs where that amount is more than the requested 455 # density 456 qualifying_thread_ids = set() 457 458 self.dataset.update_status("Filtering dense threads") 459 try: 460 percentage = int(query.get("scope_density")) / 100 461 except (ValueError, TypeError): 462 percentage = 0.15 463 464 for thread_id in items_per_thread: 465 if thread_id not in thread_sizes: 466 # thread not long enough 467 continue 468 required_items = math.ceil(percentage * thread_sizes[thread_id]) 469 if items_per_thread[thread_id] >= required_items: 470 qualifying_thread_ids.add(thread_id) 471 472 if len(qualifying_thread_ids) > 25000: 473 self.dataset.update_status( 474 "Too many matching threads (%i) to get full thread data for, aborting. Please try again with a narrower query." % len( 475 qualifying_thread_ids)) 476 return None 477 478 if qualifying_thread_ids: 479 self.dataset.update_status("Fetching all items in %i threads" % len(qualifying_thread_ids)) 480 items = self.fetch_threads(tuple(qualifying_thread_ids)) 481 else: 482 self.dataset.update_status("No threads matched the full thread search parameters.") 483 return None 484 485 elif query.get("search_scope", None) == "full-threads": 486 # get all items in threads containing at least one matching item 487 thread_ids = tuple(set([item["thread_id"] for item in items])) 488 if len(thread_ids) > 25000: 489 self.dataset.update_status( 490 "Too many matching threads (%i) to get full thread data for, aborting. Please try again with a narrower query." % len( 491 thread_ids)) 492 return None 493 494 self.dataset.update_status("Retrieving all items from %i threads" % len(thread_ids)) 495 items = self.fetch_threads(thread_ids) 496 497 elif mode == "complex": 498 # create a random sample subset of all items if requested. for 499 # complex queries, this can usually only be done at this point; 500 # for simple queries, this is handled in get_items_simple 501 if query.get("search_scope", None) == "random-sample": 502 try: 503 self.dataset.update_status("Creating random sample") 504 sample_size = int(query.get("sample_size", 5000)) 505 items = list(items) 506 random.shuffle(items) 507 return items[0:sample_size] 508 except ValueError: 509 pass 510 511 # search workers may define an 'after_search' hook that is called after 512 # the query is first completed 513 if hasattr(self, "after_search") and callable(self.after_search): 514 items = self.after_search(items) 515 516 return items 517 518 def get_items(self, query): 519 """ 520 Not available in this subclass 521 """ 522 raise NotImplementedError("Cannot use get_items() directly in SearchWithScope") 523 524 def get_search_mode(self, query): 525 """ 526 Determine what search mode to use 527 528 Can be overridden by child classes! 529 530 :param dict query: Query parameters 531 :return str: 'simple' or 'complex' 532 """ 533 if query.get("body_match", None) or query.get("subject_match", None): 534 mode = "complex" 535 else: 536 mode = "simple" 537 538 return mode 539 540 @abstractmethod 541 def get_items_simple(self, query): 542 """ 543 Get items via the simple pathway 544 545 If `get_search_mode()` returned `"simple"`, this method is used to 546 retrieve items. What this method does exactly is up to the descending 547 class. 548 549 :param dict query: Query parameters 550 :return Iterable: Items that match the parameters 551 """ 552 pass 553 554 @abstractmethod 555 def get_items_complex(self, query): 556 """ 557 Get items via the complex pathway 558 559 If `get_search_mode()` returned `"complex"`, this method is used to 560 retrieve items. What this method does exactly is up to the descending 561 class. 562 563 :param dict query: Query parameters 564 :return Iterable: Items that match the parameters 565 """ 566 pass 567 568 @abstractmethod 569 def fetch_posts(self, post_ids, where=None, replacements=None): 570 """ 571 Get items for given IDs 572 573 :param Iterable post_ids: Post IDs to e.g. match against a database 574 :param where: Deprecated, do not use 575 :param replacements: Deprecated, do not use 576 :return Iterable[dict]: Post objects 577 """ 578 pass 579 580 @abstractmethod 581 def fetch_threads(self, thread_ids): 582 """ 583 Get items for given thread IDs 584 585 :param Iterable thread_ids: Thread IDs to e.g. match against a database 586 :return Iterable[dict]: Post objects 587 """ 588 pass 589 590 @abstractmethod 591 def get_thread_sizes(self, thread_ids, min_length): 592 """ 593 Get thread lengths for all threads 594 595 :param tuple thread_ids: List of thread IDs to fetch lengths for 596 :param int min_length: Min length for a thread to be included in the 597 results 598 :return dict: Threads sizes, with thread IDs as keys 599 """ 600 pass
20class Search(BasicProcessor, ABC): 21 """ 22 Process search queries from the front-end 23 24 This class can be descended from to define a 'search worker', which 25 collects items from a given data source to create a dataset according to 26 parameters provided by the user via the web interface. 27 28 Each data source defines a search worker that contains code to interface 29 with e.g. an API or a database server. The search worker also contains a 30 definition of the parameters that can be configured by the user, via the 31 `options` attribute and/or the `get_options()` class method. 32 """ 33 #: Search worker identifier - should end with 'search' for 34 #: backwards-compatibility reasons. For example, `instagram-search`. 35 type = "abstract-search" 36 37 #: Amount of workers of this type that can run in parallel. Be careful with 38 #: this, because values higher than 1 will mean that e.g. API rate limits 39 #: are easily violated. 40 max_workers = 1 41 42 #: This attribute is only used by search workers that collect data from a 43 #: local database, to determine the name of the table to collect the data 44 #: from. If this is `4chan`, for example, items are read from 45 #: `posts_4chan`. 46 prefix = "" 47 48 # Columns to return in csv 49 # Mandatory columns: ['thread_id', 'body', 'subject', 'timestamp'] 50 return_cols = ['thread_id', 'body', 'subject', 'timestamp'] 51 52 import_error_count = 0 53 import_warning_count = 0 54 55 def process(self): 56 """ 57 Create 4CAT dataset from a data source 58 59 Gets query details, passes them on to the object's search method, and 60 writes the results to a file. If that all went well, the query and job 61 are marked as finished. 62 """ 63 64 query_parameters = self.dataset.get_parameters() 65 results_file = self.dataset.get_results_path() 66 67 self.log.info("Querying: %s" % str({k: v for k, v in query_parameters.items() if not self.get_options().get(k, {}).get("sensitive", False)})) 68 69 # Execute the relevant query (string-based, random, countryflag-based) 70 try: 71 if query_parameters.get("file"): 72 items = self.import_from_file(query_parameters.get("file")) 73 else: 74 items = self.search(query_parameters) 75 except WorkerInterruptedException: 76 raise ProcessorInterruptedException("Interrupted while collecting data, trying again later.") 77 78 # Write items to file and update the DataBase status to finished 79 num_items = 0 80 if items: 81 self.dataset.update_status("Writing collected data to dataset file") 82 if self.extension == "csv": 83 num_items = self.items_to_csv(items, results_file) 84 elif self.extension == "ndjson": 85 num_items = self.items_to_ndjson(items, results_file) 86 elif self.extension == "zip": 87 num_items = self.items_to_archive(items, results_file) 88 else: 89 raise NotImplementedError("Datasource query cannot be saved as %s file" % results_file.suffix) 90 91 self.dataset.update_status("Query finished, results are available.") 92 elif items is not None: 93 self.dataset.update_status("Query finished, no results found.") 94 95 if self.import_warning_count == 0 and self.import_error_count == 0: 96 self.dataset.finish(num_rows=num_items) 97 else: 98 self.dataset.update_status(f"All data imported. {str(self.import_error_count) + ' item(s) had an unexpected format and cannot be used in 4CAT processors. ' if self.import_error_count != 0 else ''}{str(self.import_warning_count) + ' item(s) missing some data fields. ' if self.import_warning_count != 0 else ''}\n\nMissing data is noted in the `missing_fields` column of this dataset's CSV file; see also the dataset log for details.", is_final=True) 99 self.dataset.finish(num_rows=num_items) 100 101 def search(self, query): 102 """ 103 Search for items matching the given query 104 105 The real work is done by the get_items() method of the descending 106 class. This method just provides some scaffolding and processing 107 of results via `after_search()`, if it is defined. 108 109 :param dict query: Query parameters 110 :return: Iterable of matching items, or None if there are no results. 111 """ 112 items = self.get_items(query) 113 114 if not items: 115 return None 116 117 # search workers may define an 'after_search' hook that is called after 118 # the query is first completed 119 if hasattr(self, "after_search") and callable(self.after_search): 120 items = self.after_search(items) 121 122 return items 123 124 @abstractmethod 125 def get_items(self, query): 126 """ 127 Method to fetch items with for a given query 128 129 To be implemented by descending classes! 130 131 :param dict query: Query parameters 132 :return Generator: A generator or iterable that returns items 133 collected according to the provided parameters. 134 """ 135 pass 136 137 def import_from_file(self, path): 138 """ 139 Import items from an external file 140 141 By default, this reads a file and parses each line as JSON, returning 142 the parsed object as an item. This works for NDJSON files. Data sources 143 that require importing from other or multiple file types can overwrite 144 this method. 145 146 This method has a generic implementation, but in most cases would be 147 redefined in descending classes to account for nuances in incoming data 148 for a given data source. 149 150 The file is considered disposable and deleted after importing. 151 152 :param str path: Path to read from 153 :return Generator: Yields all items in the file, item for item. 154 """ 155 if type(path) is not Path: 156 path = Path(path) 157 if not path.exists(): 158 return [] 159 160 import_warnings = {} 161 162 # Check if processor and dataset can use map_item 163 check_map_item = self.map_item_method_available(dataset=self.dataset) 164 if not check_map_item: 165 self.log.warning( 166 f"Processor {self.type} importing item without map_item method for Dataset {self.dataset.type} - {self.dataset.key}") 167 168 with path.open(encoding="utf-8") as infile: 169 unmapped_items = False 170 for i, line in enumerate(infile): 171 if self.interrupted: 172 raise WorkerInterruptedException() 173 174 try: 175 # remove NUL bytes here because they trip up a lot of other 176 # things 177 # also include import metadata in item 178 item = json.loads(line.replace("\0", "")) 179 except json.JSONDecodeError: 180 warning = (f"An item on line {i:,} of the imported file could not be parsed as JSON - this may " 181 f"indicate that the file you uploaded was incomplete and you need to try uploading it " 182 f"again. The item will be ignored.") 183 184 if warning not in import_warnings: 185 import_warnings[warning] = 0 186 import_warnings[warning] += 1 187 continue 188 189 190 new_item = { 191 **item["data"], 192 "__import_meta": {k: v for k, v in item.items() if k != "data"} 193 } 194 195 # Check map item here! 196 if check_map_item: 197 try: 198 mapped_item = self.get_mapped_item(new_item) 199 200 # keep track of items that raised a warning 201 # this means the item could be mapped, but there is 202 # some information the user should take note of 203 warning = mapped_item.get_message() 204 if not warning and mapped_item.get_missing_fields(): 205 # usually this would have an explicit warning, but 206 # if not it's still useful to know 207 warning = f"The following fields are missing for this item and will be replaced with a default value: {', '.join(mapped_item.get_missing_fields())}" 208 209 if warning: 210 if warning not in import_warnings: 211 import_warnings[warning] = 0 212 import_warnings[warning] += 1 213 self.import_warning_count += 1 214 215 except MapItemException as e: 216 # NOTE: we still yield the unmappable item; perhaps we need to update a processor's map_item method to account for this new item 217 self.import_error_count += 1 218 self.dataset.warn_unmappable_item(item_count=i, processor=self, error_message=e, warn_admins=unmapped_items is False) 219 unmapped_items = True 220 221 yield new_item 222 223 # warnings were raised about some items 224 # log these, with the number of items each warning applied to 225 if sum(import_warnings.values()) > 0: 226 self.dataset.log("While importing, the following issues were raised:") 227 for warning, num_items in import_warnings.items(): 228 self.dataset.log(f" {warning} (for {num_items:,} item(s))") 229 230 path.unlink() 231 self.dataset.delete_parameter("file") 232 233 def items_to_csv(self, results, filepath): 234 """ 235 Takes a dictionary of results, converts it to a csv, and writes it to the 236 given location. This is mostly a generic dictionary-to-CSV processor but 237 some specific processing is done on the "body" key to strip HTML from it, 238 and a human-readable timestamp is provided next to the UNIX timestamp. 239 240 :param Iterable results: List of dict rows from data source. 241 :param Path filepath: Filepath for the resulting csv 242 243 :return int: Amount of items that were processed 244 245 """ 246 if not filepath: 247 raise ResourceWarning("No result file for query") 248 249 # write the dictionary to a csv 250 if not isinstance(filepath, Path): 251 filepath = Path(filepath) 252 253 # cache hashed author names, so the hashing function (which is 254 # relatively expensive) is not run too often 255 pseudonymise_author = self.parameters.get("pseudonymise", None) == "pseudonymise" 256 anonymise_author = self.parameters.get("pseudonymise", None) == "anonymise" 257 258 # prepare hasher (which we may or may not need) 259 # we use BLAKE2 for its (so far!) resistance against cryptanalysis and 260 # speed, since we will potentially need to calculate a large amount of 261 # hashes 262 salt = secrets.token_bytes(16) 263 hasher = hashlib.blake2b(digest_size=24, salt=salt) 264 hash_cache = HashCache(hasher) 265 266 processed = 0 267 header_written = False 268 with filepath.open("w", encoding="utf-8") as csvfile: 269 # Parsing: remove the HTML tags, but keep the <br> as a newline 270 # Takes around 1.5 times longer 271 for row in results: 272 if self.interrupted: 273 raise ProcessorInterruptedException("Interrupted while writing results to file") 274 275 if not header_written: 276 fieldnames = list(row.keys()) 277 fieldnames.append("unix_timestamp") 278 writer = csv.DictWriter(csvfile, fieldnames=fieldnames, lineterminator='\n') 279 writer.writeheader() 280 header_written = True 281 282 processed += 1 283 284 # Create human dates from timestamp 285 from datetime import datetime, timezone 286 287 if "timestamp" in row: 288 # Data sources should have "timestamp" as a unix epoch integer, 289 # but do some conversion if this is not the case. 290 timestamp = row["timestamp"] 291 if not isinstance(timestamp, int): 292 if isinstance(timestamp, 293 str) and "-" not in timestamp: # String representation of epoch timestamp 294 timestamp = int(timestamp) 295 elif isinstance(timestamp, str) and "-" in timestamp: # Date string 296 try: 297 timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S").replace( 298 tzinfo=timezone.utc).timestamp() 299 except ValueError: 300 timestamp = "undefined" 301 else: 302 timestamp = "undefined" 303 304 # Add a human-readable date format as well, if we have a valid timestamp. 305 row["unix_timestamp"] = timestamp 306 if timestamp != "undefined": 307 row["timestamp"] = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') 308 else: 309 row["timestamp"] = timestamp 310 else: 311 row["timestamp"] = "undefined" 312 313 # Parse html to text 314 if row["body"]: 315 row["body"] = strip_tags(row["body"]) 316 317 # replace author column with salted hash of the author name, if 318 # pseudonymisation is enabled 319 if pseudonymise_author: 320 author_fields = [field for field in row.keys() if field.startswith("author")] 321 for author_field in author_fields: 322 row[author_field] = hash_cache.update_cache(row[author_field]) 323 324 # or remove data altogether, if it's anonymisation instead 325 elif anonymise_author: 326 for field in row.keys(): 327 if field.startswith("author"): 328 row[field] = "REDACTED" 329 330 row = remove_nuls(row) 331 writer.writerow(row) 332 333 return processed 334 335 def items_to_ndjson(self, items, filepath): 336 """ 337 Save retrieved items as an ndjson file 338 339 NDJSON is a file with one valid JSON value per line, in this case each 340 of these JSON values represents a retrieved item. This is useful if the 341 retrieved data cannot easily be completely stored as a flat CSV file 342 and we want to leave the choice of how to flatten it to the user. Note 343 that no conversion (e.g. html stripping or pseudonymisation) is done 344 here - the items are saved as-is. 345 346 :param Iterator items: Items to save 347 :param Path filepath: Location to save results file 348 """ 349 if not filepath: 350 raise ResourceWarning("No valid results path supplied") 351 352 # figure out if we need to filter the data somehow 353 hash_cache = None 354 if self.parameters.get("pseudonymise") == "pseudonymise": 355 # cache hashed author names, so the hashing function (which is 356 # relatively expensive) is not run too often 357 hasher = hashlib.blake2b(digest_size=24) 358 hasher.update(str(config.get('ANONYMISATION_SALT')).encode("utf-8")) 359 hash_cache = HashCache(hasher) 360 361 processed = 0 362 with filepath.open("w", encoding="utf-8", newline="") as outfile: 363 for item in items: 364 if self.interrupted: 365 raise ProcessorInterruptedException("Interrupted while writing results to file") 366 367 # if pseudo/anonymising, filter data recursively 368 if self.parameters.get("pseudonymise") == "pseudonymise": 369 item = dict_search_and_update(item, ["author*"], hash_cache.update_cache) 370 elif self.parameters.get("anonymise") == "anonymise": 371 item = dict_search_and_update(item, ["author*"], lambda v: "REDACTED") 372 373 outfile.write(json.dumps(item) + "\n") 374 processed += 1 375 376 return processed 377 378 def items_to_archive(self, items, filepath): 379 """ 380 Save retrieved items as an archive 381 382 Assumes that items is an iterable with one item, a Path object 383 referring to a folder containing files to be archived. The folder will 384 be removed afterwards. 385 386 :param items: 387 :param filepath: Where to store the archive 388 :return int: Number of items 389 """ 390 num_items = len(os.listdir(items)) 391 self.write_archive_and_finish(items, None, zipfile.ZIP_STORED, False) 392 return num_items
Process search queries from the front-end
This class can be descended from to define a 'search worker', which collects items from a given data source to create a dataset according to parameters provided by the user via the web interface.
Each data source defines a search worker that contains code to interface
with e.g. an API or a database server. The search worker also contains a
definition of the parameters that can be configured by the user, via the
options
attribute and/or the get_options()
class method.
55 def process(self): 56 """ 57 Create 4CAT dataset from a data source 58 59 Gets query details, passes them on to the object's search method, and 60 writes the results to a file. If that all went well, the query and job 61 are marked as finished. 62 """ 63 64 query_parameters = self.dataset.get_parameters() 65 results_file = self.dataset.get_results_path() 66 67 self.log.info("Querying: %s" % str({k: v for k, v in query_parameters.items() if not self.get_options().get(k, {}).get("sensitive", False)})) 68 69 # Execute the relevant query (string-based, random, countryflag-based) 70 try: 71 if query_parameters.get("file"): 72 items = self.import_from_file(query_parameters.get("file")) 73 else: 74 items = self.search(query_parameters) 75 except WorkerInterruptedException: 76 raise ProcessorInterruptedException("Interrupted while collecting data, trying again later.") 77 78 # Write items to file and update the DataBase status to finished 79 num_items = 0 80 if items: 81 self.dataset.update_status("Writing collected data to dataset file") 82 if self.extension == "csv": 83 num_items = self.items_to_csv(items, results_file) 84 elif self.extension == "ndjson": 85 num_items = self.items_to_ndjson(items, results_file) 86 elif self.extension == "zip": 87 num_items = self.items_to_archive(items, results_file) 88 else: 89 raise NotImplementedError("Datasource query cannot be saved as %s file" % results_file.suffix) 90 91 self.dataset.update_status("Query finished, results are available.") 92 elif items is not None: 93 self.dataset.update_status("Query finished, no results found.") 94 95 if self.import_warning_count == 0 and self.import_error_count == 0: 96 self.dataset.finish(num_rows=num_items) 97 else: 98 self.dataset.update_status(f"All data imported. {str(self.import_error_count) + ' item(s) had an unexpected format and cannot be used in 4CAT processors. ' if self.import_error_count != 0 else ''}{str(self.import_warning_count) + ' item(s) missing some data fields. ' if self.import_warning_count != 0 else ''}\n\nMissing data is noted in the `missing_fields` column of this dataset's CSV file; see also the dataset log for details.", is_final=True) 99 self.dataset.finish(num_rows=num_items)
Create 4CAT dataset from a data source
Gets query details, passes them on to the object's search method, and writes the results to a file. If that all went well, the query and job are marked as finished.
101 def search(self, query): 102 """ 103 Search for items matching the given query 104 105 The real work is done by the get_items() method of the descending 106 class. This method just provides some scaffolding and processing 107 of results via `after_search()`, if it is defined. 108 109 :param dict query: Query parameters 110 :return: Iterable of matching items, or None if there are no results. 111 """ 112 items = self.get_items(query) 113 114 if not items: 115 return None 116 117 # search workers may define an 'after_search' hook that is called after 118 # the query is first completed 119 if hasattr(self, "after_search") and callable(self.after_search): 120 items = self.after_search(items) 121 122 return items
Search for items matching the given query
The real work is done by the get_items() method of the descending
class. This method just provides some scaffolding and processing
of results via after_search()
, if it is defined.
Parameters
- dict query: Query parameters
Returns
Iterable of matching items, or None if there are no results.
124 @abstractmethod 125 def get_items(self, query): 126 """ 127 Method to fetch items with for a given query 128 129 To be implemented by descending classes! 130 131 :param dict query: Query parameters 132 :return Generator: A generator or iterable that returns items 133 collected according to the provided parameters. 134 """ 135 pass
Method to fetch items with for a given query
To be implemented by descending classes!
Parameters
- dict query: Query parameters
Returns
A generator or iterable that returns items collected according to the provided parameters.
137 def import_from_file(self, path): 138 """ 139 Import items from an external file 140 141 By default, this reads a file and parses each line as JSON, returning 142 the parsed object as an item. This works for NDJSON files. Data sources 143 that require importing from other or multiple file types can overwrite 144 this method. 145 146 This method has a generic implementation, but in most cases would be 147 redefined in descending classes to account for nuances in incoming data 148 for a given data source. 149 150 The file is considered disposable and deleted after importing. 151 152 :param str path: Path to read from 153 :return Generator: Yields all items in the file, item for item. 154 """ 155 if type(path) is not Path: 156 path = Path(path) 157 if not path.exists(): 158 return [] 159 160 import_warnings = {} 161 162 # Check if processor and dataset can use map_item 163 check_map_item = self.map_item_method_available(dataset=self.dataset) 164 if not check_map_item: 165 self.log.warning( 166 f"Processor {self.type} importing item without map_item method for Dataset {self.dataset.type} - {self.dataset.key}") 167 168 with path.open(encoding="utf-8") as infile: 169 unmapped_items = False 170 for i, line in enumerate(infile): 171 if self.interrupted: 172 raise WorkerInterruptedException() 173 174 try: 175 # remove NUL bytes here because they trip up a lot of other 176 # things 177 # also include import metadata in item 178 item = json.loads(line.replace("\0", "")) 179 except json.JSONDecodeError: 180 warning = (f"An item on line {i:,} of the imported file could not be parsed as JSON - this may " 181 f"indicate that the file you uploaded was incomplete and you need to try uploading it " 182 f"again. The item will be ignored.") 183 184 if warning not in import_warnings: 185 import_warnings[warning] = 0 186 import_warnings[warning] += 1 187 continue 188 189 190 new_item = { 191 **item["data"], 192 "__import_meta": {k: v for k, v in item.items() if k != "data"} 193 } 194 195 # Check map item here! 196 if check_map_item: 197 try: 198 mapped_item = self.get_mapped_item(new_item) 199 200 # keep track of items that raised a warning 201 # this means the item could be mapped, but there is 202 # some information the user should take note of 203 warning = mapped_item.get_message() 204 if not warning and mapped_item.get_missing_fields(): 205 # usually this would have an explicit warning, but 206 # if not it's still useful to know 207 warning = f"The following fields are missing for this item and will be replaced with a default value: {', '.join(mapped_item.get_missing_fields())}" 208 209 if warning: 210 if warning not in import_warnings: 211 import_warnings[warning] = 0 212 import_warnings[warning] += 1 213 self.import_warning_count += 1 214 215 except MapItemException as e: 216 # NOTE: we still yield the unmappable item; perhaps we need to update a processor's map_item method to account for this new item 217 self.import_error_count += 1 218 self.dataset.warn_unmappable_item(item_count=i, processor=self, error_message=e, warn_admins=unmapped_items is False) 219 unmapped_items = True 220 221 yield new_item 222 223 # warnings were raised about some items 224 # log these, with the number of items each warning applied to 225 if sum(import_warnings.values()) > 0: 226 self.dataset.log("While importing, the following issues were raised:") 227 for warning, num_items in import_warnings.items(): 228 self.dataset.log(f" {warning} (for {num_items:,} item(s))") 229 230 path.unlink() 231 self.dataset.delete_parameter("file")
Import items from an external file
By default, this reads a file and parses each line as JSON, returning the parsed object as an item. This works for NDJSON files. Data sources that require importing from other or multiple file types can overwrite this method.
This method has a generic implementation, but in most cases would be redefined in descending classes to account for nuances in incoming data for a given data source.
The file is considered disposable and deleted after importing.
Parameters
- str path: Path to read from
Returns
Yields all items in the file, item for item.
233 def items_to_csv(self, results, filepath): 234 """ 235 Takes a dictionary of results, converts it to a csv, and writes it to the 236 given location. This is mostly a generic dictionary-to-CSV processor but 237 some specific processing is done on the "body" key to strip HTML from it, 238 and a human-readable timestamp is provided next to the UNIX timestamp. 239 240 :param Iterable results: List of dict rows from data source. 241 :param Path filepath: Filepath for the resulting csv 242 243 :return int: Amount of items that were processed 244 245 """ 246 if not filepath: 247 raise ResourceWarning("No result file for query") 248 249 # write the dictionary to a csv 250 if not isinstance(filepath, Path): 251 filepath = Path(filepath) 252 253 # cache hashed author names, so the hashing function (which is 254 # relatively expensive) is not run too often 255 pseudonymise_author = self.parameters.get("pseudonymise", None) == "pseudonymise" 256 anonymise_author = self.parameters.get("pseudonymise", None) == "anonymise" 257 258 # prepare hasher (which we may or may not need) 259 # we use BLAKE2 for its (so far!) resistance against cryptanalysis and 260 # speed, since we will potentially need to calculate a large amount of 261 # hashes 262 salt = secrets.token_bytes(16) 263 hasher = hashlib.blake2b(digest_size=24, salt=salt) 264 hash_cache = HashCache(hasher) 265 266 processed = 0 267 header_written = False 268 with filepath.open("w", encoding="utf-8") as csvfile: 269 # Parsing: remove the HTML tags, but keep the <br> as a newline 270 # Takes around 1.5 times longer 271 for row in results: 272 if self.interrupted: 273 raise ProcessorInterruptedException("Interrupted while writing results to file") 274 275 if not header_written: 276 fieldnames = list(row.keys()) 277 fieldnames.append("unix_timestamp") 278 writer = csv.DictWriter(csvfile, fieldnames=fieldnames, lineterminator='\n') 279 writer.writeheader() 280 header_written = True 281 282 processed += 1 283 284 # Create human dates from timestamp 285 from datetime import datetime, timezone 286 287 if "timestamp" in row: 288 # Data sources should have "timestamp" as a unix epoch integer, 289 # but do some conversion if this is not the case. 290 timestamp = row["timestamp"] 291 if not isinstance(timestamp, int): 292 if isinstance(timestamp, 293 str) and "-" not in timestamp: # String representation of epoch timestamp 294 timestamp = int(timestamp) 295 elif isinstance(timestamp, str) and "-" in timestamp: # Date string 296 try: 297 timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S").replace( 298 tzinfo=timezone.utc).timestamp() 299 except ValueError: 300 timestamp = "undefined" 301 else: 302 timestamp = "undefined" 303 304 # Add a human-readable date format as well, if we have a valid timestamp. 305 row["unix_timestamp"] = timestamp 306 if timestamp != "undefined": 307 row["timestamp"] = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') 308 else: 309 row["timestamp"] = timestamp 310 else: 311 row["timestamp"] = "undefined" 312 313 # Parse html to text 314 if row["body"]: 315 row["body"] = strip_tags(row["body"]) 316 317 # replace author column with salted hash of the author name, if 318 # pseudonymisation is enabled 319 if pseudonymise_author: 320 author_fields = [field for field in row.keys() if field.startswith("author")] 321 for author_field in author_fields: 322 row[author_field] = hash_cache.update_cache(row[author_field]) 323 324 # or remove data altogether, if it's anonymisation instead 325 elif anonymise_author: 326 for field in row.keys(): 327 if field.startswith("author"): 328 row[field] = "REDACTED" 329 330 row = remove_nuls(row) 331 writer.writerow(row) 332 333 return processed
Takes a dictionary of results, converts it to a csv, and writes it to the given location. This is mostly a generic dictionary-to-CSV processor but some specific processing is done on the "body" key to strip HTML from it, and a human-readable timestamp is provided next to the UNIX timestamp.
Parameters
- Iterable results: List of dict rows from data source.
- Path filepath: Filepath for the resulting csv
Returns
Amount of items that were processed
335 def items_to_ndjson(self, items, filepath): 336 """ 337 Save retrieved items as an ndjson file 338 339 NDJSON is a file with one valid JSON value per line, in this case each 340 of these JSON values represents a retrieved item. This is useful if the 341 retrieved data cannot easily be completely stored as a flat CSV file 342 and we want to leave the choice of how to flatten it to the user. Note 343 that no conversion (e.g. html stripping or pseudonymisation) is done 344 here - the items are saved as-is. 345 346 :param Iterator items: Items to save 347 :param Path filepath: Location to save results file 348 """ 349 if not filepath: 350 raise ResourceWarning("No valid results path supplied") 351 352 # figure out if we need to filter the data somehow 353 hash_cache = None 354 if self.parameters.get("pseudonymise") == "pseudonymise": 355 # cache hashed author names, so the hashing function (which is 356 # relatively expensive) is not run too often 357 hasher = hashlib.blake2b(digest_size=24) 358 hasher.update(str(config.get('ANONYMISATION_SALT')).encode("utf-8")) 359 hash_cache = HashCache(hasher) 360 361 processed = 0 362 with filepath.open("w", encoding="utf-8", newline="") as outfile: 363 for item in items: 364 if self.interrupted: 365 raise ProcessorInterruptedException("Interrupted while writing results to file") 366 367 # if pseudo/anonymising, filter data recursively 368 if self.parameters.get("pseudonymise") == "pseudonymise": 369 item = dict_search_and_update(item, ["author*"], hash_cache.update_cache) 370 elif self.parameters.get("anonymise") == "anonymise": 371 item = dict_search_and_update(item, ["author*"], lambda v: "REDACTED") 372 373 outfile.write(json.dumps(item) + "\n") 374 processed += 1 375 376 return processed
Save retrieved items as an ndjson file
NDJSON is a file with one valid JSON value per line, in this case each of these JSON values represents a retrieved item. This is useful if the retrieved data cannot easily be completely stored as a flat CSV file and we want to leave the choice of how to flatten it to the user. Note that no conversion (e.g. html stripping or pseudonymisation) is done here - the items are saved as-is.
Parameters
- Iterator items: Items to save
- Path filepath: Location to save results file
378 def items_to_archive(self, items, filepath): 379 """ 380 Save retrieved items as an archive 381 382 Assumes that items is an iterable with one item, a Path object 383 referring to a folder containing files to be archived. The folder will 384 be removed afterwards. 385 386 :param items: 387 :param filepath: Where to store the archive 388 :return int: Number of items 389 """ 390 num_items = len(os.listdir(items)) 391 self.write_archive_and_finish(items, None, zipfile.ZIP_STORED, False) 392 return num_items
Save retrieved items as an archive
Assumes that items is an iterable with one item, a Path object referring to a folder containing files to be archived. The folder will be removed afterwards.
Parameters
- items:
- filepath: Where to store the archive
Returns
Number of items
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- description
- category
- extension
- config
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- add_field_to_parent
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- map_item_method_available
- get_mapped_item
- is_filter
- get_options
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor
395class SearchWithScope(Search, ABC): 396 """ 397 Search class with more complex search pathways 398 399 Some datasources may afford more complex search modes besides simply 400 returning all items matching a given set of parameters. In particular, 401 they may allow for expanding the search scope to the thread in which a 402 given matching item occurs. This subclass allows for the following 403 additional search modes: 404 405 - All items in a thread containing a matching item 406 - All items in a thread containing at least x% matching items 407 """ 408 409 def search(self, query): 410 """ 411 Complex search 412 413 Allows for two separate search pathways, one of which is chosen based 414 on the search query. Additionally, extra items are added to the results 415 if a wider search scope is requested. 416 417 :param dict query: Query parameters 418 :return: Matching items, as iterable, or None if no items match. 419 """ 420 mode = self.get_search_mode(query) 421 422 if mode == "simple": 423 items = self.get_items_simple(query) 424 else: 425 items = self.get_items_complex(query) 426 427 if not items: 428 return None 429 430 # handle the various search scope options after retrieving initial item 431 # list 432 if query.get("search_scope", None) == "dense-threads": 433 # dense threads - all items in all threads in which the requested 434 # proportion of items matches 435 # first, get amount of items for all threads in which matching 436 # items occur and that are long enough 437 thread_ids = tuple([item["thread_id"] for item in items]) 438 self.dataset.update_status("Retrieving thread metadata for %i threads" % len(thread_ids)) 439 try: 440 min_length = int(query.get("scope_length", 30)) 441 except ValueError: 442 min_length = 30 443 444 thread_sizes = self.get_thread_sizes(thread_ids, min_length) 445 446 # determine how many matching items occur per thread in the initial 447 # data set 448 items_per_thread = {} 449 for item in items: 450 if item["thread_id"] not in items_per_thread: 451 items_per_thread[item["thread_id"]] = 0 452 453 items_per_thread[item["thread_id"]] += 1 454 455 # keep all thread IDs where that amount is more than the requested 456 # density 457 qualifying_thread_ids = set() 458 459 self.dataset.update_status("Filtering dense threads") 460 try: 461 percentage = int(query.get("scope_density")) / 100 462 except (ValueError, TypeError): 463 percentage = 0.15 464 465 for thread_id in items_per_thread: 466 if thread_id not in thread_sizes: 467 # thread not long enough 468 continue 469 required_items = math.ceil(percentage * thread_sizes[thread_id]) 470 if items_per_thread[thread_id] >= required_items: 471 qualifying_thread_ids.add(thread_id) 472 473 if len(qualifying_thread_ids) > 25000: 474 self.dataset.update_status( 475 "Too many matching threads (%i) to get full thread data for, aborting. Please try again with a narrower query." % len( 476 qualifying_thread_ids)) 477 return None 478 479 if qualifying_thread_ids: 480 self.dataset.update_status("Fetching all items in %i threads" % len(qualifying_thread_ids)) 481 items = self.fetch_threads(tuple(qualifying_thread_ids)) 482 else: 483 self.dataset.update_status("No threads matched the full thread search parameters.") 484 return None 485 486 elif query.get("search_scope", None) == "full-threads": 487 # get all items in threads containing at least one matching item 488 thread_ids = tuple(set([item["thread_id"] for item in items])) 489 if len(thread_ids) > 25000: 490 self.dataset.update_status( 491 "Too many matching threads (%i) to get full thread data for, aborting. Please try again with a narrower query." % len( 492 thread_ids)) 493 return None 494 495 self.dataset.update_status("Retrieving all items from %i threads" % len(thread_ids)) 496 items = self.fetch_threads(thread_ids) 497 498 elif mode == "complex": 499 # create a random sample subset of all items if requested. for 500 # complex queries, this can usually only be done at this point; 501 # for simple queries, this is handled in get_items_simple 502 if query.get("search_scope", None) == "random-sample": 503 try: 504 self.dataset.update_status("Creating random sample") 505 sample_size = int(query.get("sample_size", 5000)) 506 items = list(items) 507 random.shuffle(items) 508 return items[0:sample_size] 509 except ValueError: 510 pass 511 512 # search workers may define an 'after_search' hook that is called after 513 # the query is first completed 514 if hasattr(self, "after_search") and callable(self.after_search): 515 items = self.after_search(items) 516 517 return items 518 519 def get_items(self, query): 520 """ 521 Not available in this subclass 522 """ 523 raise NotImplementedError("Cannot use get_items() directly in SearchWithScope") 524 525 def get_search_mode(self, query): 526 """ 527 Determine what search mode to use 528 529 Can be overridden by child classes! 530 531 :param dict query: Query parameters 532 :return str: 'simple' or 'complex' 533 """ 534 if query.get("body_match", None) or query.get("subject_match", None): 535 mode = "complex" 536 else: 537 mode = "simple" 538 539 return mode 540 541 @abstractmethod 542 def get_items_simple(self, query): 543 """ 544 Get items via the simple pathway 545 546 If `get_search_mode()` returned `"simple"`, this method is used to 547 retrieve items. What this method does exactly is up to the descending 548 class. 549 550 :param dict query: Query parameters 551 :return Iterable: Items that match the parameters 552 """ 553 pass 554 555 @abstractmethod 556 def get_items_complex(self, query): 557 """ 558 Get items via the complex pathway 559 560 If `get_search_mode()` returned `"complex"`, this method is used to 561 retrieve items. What this method does exactly is up to the descending 562 class. 563 564 :param dict query: Query parameters 565 :return Iterable: Items that match the parameters 566 """ 567 pass 568 569 @abstractmethod 570 def fetch_posts(self, post_ids, where=None, replacements=None): 571 """ 572 Get items for given IDs 573 574 :param Iterable post_ids: Post IDs to e.g. match against a database 575 :param where: Deprecated, do not use 576 :param replacements: Deprecated, do not use 577 :return Iterable[dict]: Post objects 578 """ 579 pass 580 581 @abstractmethod 582 def fetch_threads(self, thread_ids): 583 """ 584 Get items for given thread IDs 585 586 :param Iterable thread_ids: Thread IDs to e.g. match against a database 587 :return Iterable[dict]: Post objects 588 """ 589 pass 590 591 @abstractmethod 592 def get_thread_sizes(self, thread_ids, min_length): 593 """ 594 Get thread lengths for all threads 595 596 :param tuple thread_ids: List of thread IDs to fetch lengths for 597 :param int min_length: Min length for a thread to be included in the 598 results 599 :return dict: Threads sizes, with thread IDs as keys 600 """ 601 pass
Search class with more complex search pathways
Some datasources may afford more complex search modes besides simply returning all items matching a given set of parameters. In particular, they may allow for expanding the search scope to the thread in which a given matching item occurs. This subclass allows for the following additional search modes:
- All items in a thread containing a matching item
- All items in a thread containing at least x% matching items
409 def search(self, query): 410 """ 411 Complex search 412 413 Allows for two separate search pathways, one of which is chosen based 414 on the search query. Additionally, extra items are added to the results 415 if a wider search scope is requested. 416 417 :param dict query: Query parameters 418 :return: Matching items, as iterable, or None if no items match. 419 """ 420 mode = self.get_search_mode(query) 421 422 if mode == "simple": 423 items = self.get_items_simple(query) 424 else: 425 items = self.get_items_complex(query) 426 427 if not items: 428 return None 429 430 # handle the various search scope options after retrieving initial item 431 # list 432 if query.get("search_scope", None) == "dense-threads": 433 # dense threads - all items in all threads in which the requested 434 # proportion of items matches 435 # first, get amount of items for all threads in which matching 436 # items occur and that are long enough 437 thread_ids = tuple([item["thread_id"] for item in items]) 438 self.dataset.update_status("Retrieving thread metadata for %i threads" % len(thread_ids)) 439 try: 440 min_length = int(query.get("scope_length", 30)) 441 except ValueError: 442 min_length = 30 443 444 thread_sizes = self.get_thread_sizes(thread_ids, min_length) 445 446 # determine how many matching items occur per thread in the initial 447 # data set 448 items_per_thread = {} 449 for item in items: 450 if item["thread_id"] not in items_per_thread: 451 items_per_thread[item["thread_id"]] = 0 452 453 items_per_thread[item["thread_id"]] += 1 454 455 # keep all thread IDs where that amount is more than the requested 456 # density 457 qualifying_thread_ids = set() 458 459 self.dataset.update_status("Filtering dense threads") 460 try: 461 percentage = int(query.get("scope_density")) / 100 462 except (ValueError, TypeError): 463 percentage = 0.15 464 465 for thread_id in items_per_thread: 466 if thread_id not in thread_sizes: 467 # thread not long enough 468 continue 469 required_items = math.ceil(percentage * thread_sizes[thread_id]) 470 if items_per_thread[thread_id] >= required_items: 471 qualifying_thread_ids.add(thread_id) 472 473 if len(qualifying_thread_ids) > 25000: 474 self.dataset.update_status( 475 "Too many matching threads (%i) to get full thread data for, aborting. Please try again with a narrower query." % len( 476 qualifying_thread_ids)) 477 return None 478 479 if qualifying_thread_ids: 480 self.dataset.update_status("Fetching all items in %i threads" % len(qualifying_thread_ids)) 481 items = self.fetch_threads(tuple(qualifying_thread_ids)) 482 else: 483 self.dataset.update_status("No threads matched the full thread search parameters.") 484 return None 485 486 elif query.get("search_scope", None) == "full-threads": 487 # get all items in threads containing at least one matching item 488 thread_ids = tuple(set([item["thread_id"] for item in items])) 489 if len(thread_ids) > 25000: 490 self.dataset.update_status( 491 "Too many matching threads (%i) to get full thread data for, aborting. Please try again with a narrower query." % len( 492 thread_ids)) 493 return None 494 495 self.dataset.update_status("Retrieving all items from %i threads" % len(thread_ids)) 496 items = self.fetch_threads(thread_ids) 497 498 elif mode == "complex": 499 # create a random sample subset of all items if requested. for 500 # complex queries, this can usually only be done at this point; 501 # for simple queries, this is handled in get_items_simple 502 if query.get("search_scope", None) == "random-sample": 503 try: 504 self.dataset.update_status("Creating random sample") 505 sample_size = int(query.get("sample_size", 5000)) 506 items = list(items) 507 random.shuffle(items) 508 return items[0:sample_size] 509 except ValueError: 510 pass 511 512 # search workers may define an 'after_search' hook that is called after 513 # the query is first completed 514 if hasattr(self, "after_search") and callable(self.after_search): 515 items = self.after_search(items) 516 517 return items
Complex search
Allows for two separate search pathways, one of which is chosen based on the search query. Additionally, extra items are added to the results if a wider search scope is requested.
Parameters
- dict query: Query parameters
Returns
Matching items, as iterable, or None if no items match.
519 def get_items(self, query): 520 """ 521 Not available in this subclass 522 """ 523 raise NotImplementedError("Cannot use get_items() directly in SearchWithScope")
Not available in this subclass
525 def get_search_mode(self, query): 526 """ 527 Determine what search mode to use 528 529 Can be overridden by child classes! 530 531 :param dict query: Query parameters 532 :return str: 'simple' or 'complex' 533 """ 534 if query.get("body_match", None) or query.get("subject_match", None): 535 mode = "complex" 536 else: 537 mode = "simple" 538 539 return mode
Determine what search mode to use
Can be overridden by child classes!
Parameters
- dict query: Query parameters
Returns
'simple' or 'complex'
541 @abstractmethod 542 def get_items_simple(self, query): 543 """ 544 Get items via the simple pathway 545 546 If `get_search_mode()` returned `"simple"`, this method is used to 547 retrieve items. What this method does exactly is up to the descending 548 class. 549 550 :param dict query: Query parameters 551 :return Iterable: Items that match the parameters 552 """ 553 pass
Get items via the simple pathway
If get_search_mode()
returned "simple"
, this method is used to
retrieve items. What this method does exactly is up to the descending
class.
Parameters
- dict query: Query parameters
Returns
Items that match the parameters
555 @abstractmethod 556 def get_items_complex(self, query): 557 """ 558 Get items via the complex pathway 559 560 If `get_search_mode()` returned `"complex"`, this method is used to 561 retrieve items. What this method does exactly is up to the descending 562 class. 563 564 :param dict query: Query parameters 565 :return Iterable: Items that match the parameters 566 """ 567 pass
Get items via the complex pathway
If get_search_mode()
returned "complex"
, this method is used to
retrieve items. What this method does exactly is up to the descending
class.
Parameters
- dict query: Query parameters
Returns
Items that match the parameters
569 @abstractmethod 570 def fetch_posts(self, post_ids, where=None, replacements=None): 571 """ 572 Get items for given IDs 573 574 :param Iterable post_ids: Post IDs to e.g. match against a database 575 :param where: Deprecated, do not use 576 :param replacements: Deprecated, do not use 577 :return Iterable[dict]: Post objects 578 """ 579 pass
Get items for given IDs
Parameters
- Iterable post_ids: Post IDs to e.g. match against a database
- where: Deprecated, do not use
- replacements: Deprecated, do not use
Returns
Post objects
581 @abstractmethod 582 def fetch_threads(self, thread_ids): 583 """ 584 Get items for given thread IDs 585 586 :param Iterable thread_ids: Thread IDs to e.g. match against a database 587 :return Iterable[dict]: Post objects 588 """ 589 pass
Get items for given thread IDs
Parameters
- Iterable thread_ids: Thread IDs to e.g. match against a database
Returns
Post objects
591 @abstractmethod 592 def get_thread_sizes(self, thread_ids, min_length): 593 """ 594 Get thread lengths for all threads 595 596 :param tuple thread_ids: List of thread IDs to fetch lengths for 597 :param int min_length: Min length for a thread to be included in the 598 results 599 :return dict: Threads sizes, with thread IDs as keys 600 """ 601 pass
Get thread lengths for all threads
Parameters
- tuple thread_ids: List of thread IDs to fetch lengths for
- int min_length: Min length for a thread to be included in the results
Returns
Threads sizes, with thread IDs as keys
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- Search
- type
- max_workers
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- description
- category
- extension
- config
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- add_field_to_parent
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- map_item_method_available
- get_mapped_item
- is_filter
- get_options
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor