Edit on GitHub

common.lib.dataset

   1import collections
   2import itertools
   3import datetime
   4import hashlib
   5import fnmatch
   6import random
   7import shutil
   8import json
   9import time
  10import csv
  11import re
  12
  13from pathlib import Path
  14
  15from common.config_manager import config
  16from common.lib.job import Job, JobNotFoundException
  17from common.lib.module_loader import ModuleCollector
  18from common.lib.helpers import get_software_commit, NullAwareTextIOWrapper, convert_to_int, get_software_version, call_api
  19from common.lib.item_mapping import MappedItem, MissingMappedField, DatasetItem
  20from common.lib.fourcat_module import FourcatModule
  21from common.lib.exceptions import (ProcessorInterruptedException, DataSetException, DataSetNotFoundException,
  22								   MapItemException, MappedItemIncompleteException)
  23
  24
  25class DataSet(FourcatModule):
  26	"""
  27	Provide interface to safely register and run operations on a dataset
  28
  29	A dataset is a collection of:
  30	- A unique identifier
  31	- A set of parameters that demarcate the data contained within
  32	- The data
  33
  34	The data is usually stored in a file on the disk; the parameters are stored
  35	in a database. The handling of the data, et cetera, is done by other
  36	workers; this class defines method to create and manipulate the dataset's
  37	properties.
  38	"""
  39	# Attributes must be created here to ensure getattr and setattr work properly
  40	data = None
  41	key = ""
  42
  43	children = None
  44	available_processors = None
  45	genealogy = None
  46	preset_parent = None
  47	parameters = None
  48	modules = None
  49
  50	owners = None
  51	tagged_owners = None
  52
  53	db = None
  54	folder = None
  55	is_new = True
  56
  57	no_status_updates = False
  58	staging_areas = None
  59	_queue_position = None
  60
  61	def __init__(self, parameters=None, key=None, job=None, data=None, db=None, parent='', extension=None,
  62				 type=None, is_private=True, owner="anonymous", modules=None):
  63		"""
  64		Create new dataset object
  65
  66		If the dataset is not in the database yet, it is added.
  67
  68		:param dict parameters:  Only when creating a new dataset. Dataset
  69		parameters, free-form dictionary.
  70		:param str key: Dataset key. If given, dataset with this key is loaded.
  71		:param int job: Job ID. If given, dataset corresponding to job is
  72		loaded.
  73		:param dict data: Dataset data, corresponding to a row in the datasets
  74		database table. If not given, retrieved from database depending on key.
  75		:param db:  Database connection
  76		:param str parent:  Only when creating a new dataset. Parent dataset
  77		key to which the one being created is a child.
  78		:param str extension: Only when creating a new dataset. Extension of
  79		dataset result file.
  80		:param str type: Only when creating a new dataset. Type of the dataset,
  81		corresponding to the type property of a processor class.
  82		:param bool is_private: Only when creating a new dataset. Whether the
  83		dataset is private or public.
  84		:param str owner: Only when creating a new dataset. The user name of
  85		the dataset's creator.
  86		:param modules: Module cache. If not given, will be loaded when needed
  87		(expensive). Used to figure out what processors are compatible with
  88		this dataset.
  89		"""
  90		self.db = db
  91		self.folder = config.get('PATH_ROOT').joinpath(config.get('PATH_DATA'))
  92		# Ensure mutable attributes are set in __init__ as they are unique to each DataSet
  93		self.data = {}
  94		self.parameters = {}
  95		self.children = []
  96		self.available_processors = {}
  97		self.genealogy = []
  98		self.staging_areas = []
  99		self.modules = modules
 100
 101		if key is not None:
 102			self.key = key
 103			current = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (self.key,))
 104			if not current:
 105				raise DataSetNotFoundException("DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given" % key)
 106
 107		elif job is not None:
 108			current = self.db.fetchone("SELECT * FROM datasets WHERE parameters::json->>'job' = %s", (job,))
 109			if not current:
 110				raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument")
 111
 112			self.key = current["key"]
 113		elif data is not None:
 114			current = data
 115			if "query" not in data or "key" not in data or "parameters" not in data or "key_parent" not in data:
 116				raise DataSetException("DataSet() requires a complete dataset record for its 'data' argument")
 117
 118			self.key = current["key"]
 119		else:
 120			if parameters is None:
 121				raise DataSetException("DataSet() requires either 'key', or 'parameters' to be given")
 122
 123			if not type:
 124				raise DataSetException("Datasets must have their type set explicitly")
 125
 126			query = self.get_label(parameters, default=type)
 127			self.key = self.get_key(query, parameters, parent)
 128			current = self.db.fetchone("SELECT * FROM datasets WHERE key = %s AND query = %s", (self.key, query))
 129
 130		if current:
 131			self.data = current
 132			self.parameters = json.loads(self.data["parameters"])
 133			self.is_new = False
 134		else:
 135			self.data = {"type": type}  # get_own_processor needs this
 136			own_processor = self.get_own_processor()
 137			version = get_software_commit(own_processor)
 138			self.data = {
 139				"key": self.key,
 140				"query": self.get_label(parameters, default=type),
 141				"parameters": json.dumps(parameters),
 142				"result_file": "",
 143				"creator": owner,
 144				"status": "",
 145				"type": type,
 146				"timestamp": int(time.time()),
 147				"is_finished": False,
 148				"is_private": is_private,
 149				"software_version": version[0],
 150				"software_source": version[1],
 151				"software_file": "",
 152				"num_rows": 0,
 153				"progress": 0.0,
 154				"key_parent": parent
 155			}
 156			self.parameters = parameters
 157
 158			self.db.insert("datasets", data=self.data)
 159			self.refresh_owners()
 160			self.add_owner(owner)
 161
 162			# Find desired extension from processor if not explicitly set
 163			if extension is None:
 164				if own_processor:
 165					extension = own_processor.get_extension(parent_dataset=DataSet(key=parent, db=db, modules=self.modules) if parent else None)
 166				# Still no extension, default to 'csv'
 167				if not extension:
 168					extension = "csv"
 169
 170			# Reserve filename and update data['result_file']
 171			self.reserve_result_file(parameters, extension)
 172
 173		# retrieve analyses and processors that may be run for this dataset
 174		analyses = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC", (self.key,))
 175		self.children = sorted([DataSet(data=analysis, db=self.db, modules=self.modules) for analysis in analyses],
 176							   key=lambda dataset: dataset.is_finished(), reverse=True)
 177
 178		self.refresh_owners()
 179
 180	def check_dataset_finished(self):
 181		"""
 182		Checks if dataset is finished. Returns path to results file is not empty,
 183		or 'empty_file' when there were not matches.
 184
 185		Only returns a path if the dataset is complete. In other words, if this
 186		method returns a path, a file with the complete results for this dataset
 187		will exist at that location.
 188
 189		:return: A path to the results file, 'empty_file', or `None`
 190		"""
 191		if self.data["is_finished"] and self.data["num_rows"] > 0:
 192			return self.folder.joinpath(self.data["result_file"])
 193		elif self.data["is_finished"] and self.data["num_rows"] == 0:
 194			return 'empty'
 195		else:
 196			return None
 197
 198	def get_results_path(self):
 199		"""
 200		Get path to results file
 201
 202		Always returns a path, that will at some point contain the dataset
 203		data, but may not do so yet. Use this to get the location to write
 204		generated results to.
 205
 206		:return Path:  A path to the results file
 207		"""
 208		return self.folder.joinpath(self.data["result_file"])
 209
 210	def get_results_folder_path(self):
 211		"""
 212		Get path to folder containing accompanying results
 213
 214		Returns a path that may not yet be created
 215
 216		:return Path:  A path to the results file
 217		"""
 218		return self.folder.joinpath("folder_" + self.key)
 219
 220	def get_log_path(self):
 221		"""
 222		Get path to dataset log file
 223
 224		Each dataset has a single log file that documents its creation. This
 225		method returns the path to that file. It is identical to the path of
 226		the dataset result file, with 'log' as its extension instead.
 227
 228		:return Path:  A path to the log file
 229		"""
 230		return self.get_results_path().with_suffix(".log")
 231
 232	def clear_log(self):
 233		"""
 234		Clears the dataset log file
 235
 236		If the log file does not exist, it is created empty. The log file will
 237		have the same file name as the dataset result file, with the 'log'
 238		extension.
 239		"""
 240		log_path = self.get_log_path()
 241		with log_path.open("w") as outfile:
 242			pass
 243
 244	def log(self, log):
 245		"""
 246		Write log message to file
 247
 248		Writes the log message to the log file on a new line, including a
 249		timestamp at the start of the line. Note that this assumes the log file
 250		already exists - it should have been created/cleared with clear_log()
 251		prior to calling this.
 252
 253		:param str log:  Log message to write
 254		"""
 255		log_path = self.get_log_path()
 256		with log_path.open("a", encoding="utf-8") as outfile:
 257			outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log))
 258
 259	def _iterate_items(self, processor=None):
 260		"""
 261		A generator that iterates through a CSV or NDJSON file
 262
 263		This is an internal method and should not be called directly. Rather,
 264		call iterate_items() and use the generated dictionary and its properties.
 265
 266		If a reference to a processor is provided, with every iteration,
 267		the processor's 'interrupted' flag is checked, and if set a
 268		ProcessorInterruptedException is raised, which by default is caught
 269		in the worker and subsequently stops execution gracefully.
 270
 271		There are two file types that can be iterated (currently): CSV files
 272		and NDJSON (newline-delimited JSON) files. In the future, one could
 273		envision adding a pathway to retrieve items from e.g. a MongoDB
 274		collection directly instead of from a static file
 275
 276		:param BasicProcessor processor:  A reference to the processor
 277		iterating the dataset.
 278		:return generator:  A generator that yields each item as a dictionary
 279		"""
 280		path = self.get_results_path()
 281
 282		# Yield through items one by one
 283		if path.suffix.lower() == ".csv":
 284			with path.open("rb") as infile:
 285				wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8")
 286				reader = csv.DictReader(wrapped_infile)
 287
 288				if not self.get_own_processor():
 289					# Processor was deprecated or removed; CSV file is likely readable but some legacy types are not
 290					first_item = next(reader)
 291					if first_item is None or any([True for key in first_item if type(key) is not str]):
 292						raise NotImplementedError(f"Cannot iterate through CSV file (deprecated processor {self.type})")
 293					yield first_item
 294
 295				for item in reader:
 296					if hasattr(processor, "interrupted") and processor.interrupted:
 297						raise ProcessorInterruptedException("Processor interrupted while iterating through CSV file")
 298
 299					yield item
 300
 301		elif path.suffix.lower() == ".ndjson":
 302			# In NDJSON format each line in the file is a self-contained JSON
 303			with path.open(encoding="utf-8") as infile:
 304				for line in infile:
 305					if hasattr(processor, "interrupted") and processor.interrupted:
 306						raise ProcessorInterruptedException("Processor interrupted while iterating through NDJSON file")
 307
 308					yield json.loads(line)
 309
 310		else:
 311			raise NotImplementedError("Cannot iterate through %s file" % path.suffix)
 312
 313	def iterate_items(self, processor=None, warn_unmappable=True, map_missing="default"):
 314		"""
 315		Generate mapped dataset items
 316
 317		Wrapper for _iterate_items that returns a DatasetItem, which can be
 318		accessed as a dict returning the original item or (if a mapper is
 319		available) the mapped item. Mapped or original versions of the item can
 320		also be accessed via the `original` and `mapped_object` properties of
 321		the DatasetItem.
 322
 323		Processors can define a method called `map_item` that can be used to map
 324		an item from the dataset file before it is processed any further. This is
 325		slower than storing the data file in the right format to begin with but
 326		not all data sources allow for easy 'flat' mapping of items, e.g. tweets
 327		are nested objects when retrieved from the twitter API that are easier
 328		to store as a JSON file than as a flat CSV file, and it would be a shame
 329		to throw away that data.
 330
 331		Note the two parameters warn_unmappable and map_missing. Items can be
 332		unmappable in that their structure is too different to coerce into a
 333		neat dictionary of the structure the data source expects. This makes it
 334		'unmappable' and warn_unmappable determines what happens in this case.
 335		It can also be of the right structure, but with some fields missing or
 336		incomplete. map_missing determines what happens in that case. The
 337		latter is for example possible when importing data via Zeeschuimer,
 338		which produces unstably-structured data captured from social media
 339		sites.
 340
 341		:param BasicProcessor processor:  A reference to the processor
 342		iterating the dataset.
 343		:param bool warn_unmappable:  If an item is not mappable, skip the item
 344		and log a warning
 345		:param map_missing: Indicates what to do with mapped items for which
 346		some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
 347		- 'default': fill missing fields with the default passed by map_item
 348		- 'abort': raise a MappedItemIncompleteException if a field is missing
 349		- a callback: replace missing field with the return value of the
 350		  callback. The MappedItem object is passed to the callback as the
 351		  first argument and the name of the missing field as the second.
 352		- a dictionary with a key for each possible missing field: replace missing
 353		  field with a strategy for that field ('default', 'abort', or a callback)
 354
 355		:return generator:  A generator that yields DatasetItems
 356		"""
 357		unmapped_items = False
 358		# Collect item_mapper for use with filter
 359		item_mapper = False
 360		own_processor = self.get_own_processor()
 361		if own_processor and own_processor.map_item_method_available(dataset=self):
 362			item_mapper = True
 363
 364		# missing field strategy can be for all fields at once, or per field
 365		# if it is per field, it is a dictionary with field names and their strategy
 366		# if it is for all fields, it is may be a callback, 'abort', or 'default'
 367		default_strategy = "default"
 368		if type(map_missing) is not dict:
 369			default_strategy = map_missing
 370			map_missing = {}
 371
 372		# Loop through items
 373		for i, item in enumerate(self._iterate_items(processor)):
 374			# Save original to yield
 375			original_item = item.copy()
 376
 377			# Map item
 378			if item_mapper:
 379				try:
 380					mapped_item = own_processor.get_mapped_item(item)
 381				except MapItemException as e:
 382					if warn_unmappable:
 383						self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False)
 384						unmapped_items = True
 385					continue
 386
 387				# check if fields have been marked as 'missing' in the
 388				# underlying data, and treat according to the chosen strategy
 389				if mapped_item.get_missing_fields():
 390					for missing_field in mapped_item.get_missing_fields():
 391						strategy = map_missing.get(missing_field, default_strategy)
 392
 393						if callable(strategy):
 394							# delegate handling to a callback
 395							mapped_item.data[missing_field] = strategy(mapped_item.data, missing_field)
 396						elif strategy == "abort":
 397							# raise an exception to be handled at the processor level
 398							raise MappedItemIncompleteException(f"Cannot process item, field {missing_field} missing in source data.")
 399						elif strategy == "default":
 400							# use whatever was passed to the object constructor
 401							mapped_item.data[missing_field] = mapped_item.data[missing_field].value
 402						else:
 403							raise ValueError("map_missing must be 'abort', 'default', or a callback.")
 404
 405			else:
 406				mapped_item = original_item
 407
 408			# yield a DatasetItem, which is a dict with some special properties
 409			yield DatasetItem(mapper=item_mapper, original=original_item, mapped_object=mapped_item, **(mapped_item.get_item_data() if type(mapped_item) is MappedItem else mapped_item))
 410
 411	def get_staging_area(self):
 412		"""
 413		Get path to a temporary folder in which files can be stored before
 414		finishing
 415
 416		This folder must be created before use, but is guaranteed to not exist
 417		yet. The folder may be used as a staging area for the dataset data
 418		while it is being processed.
 419
 420		:return Path:  Path to folder
 421		"""
 422		results_file = self.get_results_path()
 423
 424		results_dir_base = results_file.parent
 425		results_dir = results_file.name.replace(".", "") + "-staging"
 426		results_path = results_dir_base.joinpath(results_dir)
 427		index = 1
 428		while results_path.exists():
 429			results_path = results_dir_base.joinpath(results_dir + "-" + str(index))
 430			index += 1
 431
 432		# create temporary folder
 433		results_path.mkdir()
 434
 435		# Storing the staging area with the dataset so that it can be removed later
 436		self.staging_areas.append(results_path)
 437
 438		return results_path
 439
 440	def remove_staging_areas(self):
 441		"""
 442		Remove any staging areas that were created and all files contained in them.
 443		"""
 444		# Remove DataSet staging areas
 445		if self.staging_areas:
 446			for staging_area in self.staging_areas:
 447				if staging_area.is_dir():
 448					shutil.rmtree(staging_area)
 449
 450	def finish(self, num_rows=0):
 451		"""
 452		Declare the dataset finished
 453		"""
 454		if self.data["is_finished"]:
 455			raise RuntimeError("Cannot finish a finished dataset again")
 456
 457		self.db.update("datasets", where={"key": self.data["key"]},
 458					   data={"is_finished": True, "num_rows": num_rows, "progress": 1.0, "timestamp_finished": int(time.time())})
 459		self.data["is_finished"] = True
 460		self.data["num_rows"] = num_rows
 461
 462	def copy(self, shallow=True):
 463		"""
 464		Copies the dataset, making a new version with a unique key
 465
 466
 467		:param bool shallow:  Shallow copy: does not copy the result file, but
 468		instead refers to the same file as the original dataset did
 469		:return Dataset:  Copied dataset
 470		"""
 471		parameters = self.parameters.copy()
 472
 473		# a key is partially based on the parameters. so by setting these extra
 474		# attributes, we also ensure a unique key will be generated for the
 475		# copy
 476		# possibly todo: don't use time for uniqueness (but one shouldn't be
 477		# copying a dataset multiple times per microsecond, that's not what
 478		# this is for)
 479		parameters["copied_from"] = self.key
 480		parameters["copied_at"] = time.time()
 481
 482		copy = DataSet(parameters=parameters, db=self.db, extension=self.result_file.split(".")[-1], type=self.type, modules=self.modules)
 483		for field in self.data:
 484			if field in ("id", "key", "timestamp", "job", "parameters", "result_file"):
 485				continue
 486
 487			copy.__setattr__(field, self.data[field])
 488
 489		if shallow:
 490			# use the same result file
 491			copy.result_file = self.result_file
 492		else:
 493			# copy to new file with new key
 494			shutil.copy(self.get_results_path(), copy.get_results_path())
 495
 496		if self.is_finished():
 497			copy.finish(self.num_rows)
 498
 499		# make sure ownership is also copied
 500		copy.copy_ownership_from(self)
 501
 502		return copy
 503
 504	def delete(self, commit=True, queue=None):
 505		"""
 506		Delete the dataset, and all its children
 507
 508		Deletes both database records and result files. Note that manipulating
 509		a dataset object after it has been deleted is undefined behaviour.
 510
 511		:param bool commit:  Commit SQL DELETE query?
 512		"""
 513		# first, recursively delete children
 514		children = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,))
 515		for child in children:
 516			try:
 517				child = DataSet(key=child["key"], db=self.db, modules=self.modules)
 518				child.delete(commit=commit)
 519			except DataSetException:
 520				# dataset already deleted - race condition?
 521				pass
 522
 523		# delete any queued jobs for this dataset
 524		try:
 525			job = Job.get_by_remote_ID(self.key, self.db, self.type)
 526			if job.is_claimed:
 527				# tell API to stop any jobs running for this dataset
 528				# level 2 = cancel job
 529				# we're not interested in the result - if the API is available,
 530				# it will do its thing, if it's not the backend is probably not
 531				# running so the job also doesn't need to be interrupted
 532				call_api(
 533					"cancel-job",
 534					{"remote_id": self.key, "jobtype": self.type, "level": 2},
 535					False
 536				)
 537
 538			# this deletes the job from the database
 539			job.finish(True)
 540
 541		except JobNotFoundException:
 542			pass
 543
 544		# delete from database
 545		self.db.delete("datasets", where={"key": self.key}, commit=commit)
 546		self.db.delete("datasets_owners", where={"key": self.key}, commit=commit)
 547		self.db.delete("users_favourites", where={"key": self.key}, commit=commit)
 548
 549		# delete from drive
 550		try:
 551			if self.get_results_path().exists():
 552				self.get_results_path().unlink()
 553			if self.get_results_path().with_suffix(".log").exists():
 554				self.get_results_path().with_suffix(".log").unlink()
 555			if self.get_results_folder_path().exists():
 556				shutil.rmtree(self.get_results_folder_path())
 557
 558		except FileNotFoundError:
 559			# already deleted, apparently
 560			pass
 561		except PermissionError as e:
 562			self.db.log.error(f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}")
 563
 564	def update_children(self, **kwargs):
 565		"""
 566		Update an attribute for all child datasets
 567
 568		Can be used to e.g. change the owner, version, finished status for all
 569		datasets in a tree
 570
 571		:param kwargs:  Parameters corresponding to known dataset attributes
 572		"""
 573		children = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,))
 574		for child in children:
 575			child = DataSet(key=child["key"], db=self.db, modules=self.modules)
 576			for attr, value in kwargs.items():
 577				child.__setattr__(attr, value)
 578
 579			child.update_children(**kwargs)
 580
 581	def is_finished(self):
 582		"""
 583		Check if dataset is finished
 584		:return bool:
 585		"""
 586		return self.data["is_finished"] == True
 587
 588	def is_rankable(self, multiple_items=True):
 589		"""
 590		Determine if a dataset is rankable
 591
 592		Rankable means that it is a CSV file with 'date' and 'value' columns
 593		as well as one or more item label columns
 594
 595		:param bool multiple_items:  Consider datasets with multiple items per
 596		item (e.g. word_1, word_2, etc)?
 597
 598		:return bool:  Whether the dataset is rankable or not
 599		"""
 600		if self.get_results_path().suffix != ".csv" or not self.get_results_path().exists():
 601			return False
 602
 603		column_options = {"date", "value", "item"}
 604		if multiple_items:
 605			column_options.add("word_1")
 606
 607		with self.get_results_path().open(encoding="utf-8") as infile:
 608			reader = csv.DictReader(infile)
 609			try:
 610				return len(set(reader.fieldnames) & column_options) >= 3
 611			except (TypeError, ValueError):
 612				return False
 613
 614	def is_accessible_by(self, username, role="owner"):
 615		"""
 616		Check if dataset has given user as owner
 617
 618		:param str|User username: Username to check for
 619		:return bool:
 620		"""
 621		if type(username) is not str:
 622			if hasattr(username, "get_id"):
 623				username = username.get_id()
 624			else:
 625				raise TypeError("User must be a str or User object")
 626
 627		# 'normal' owners
 628		if username in [owner for owner, meta in self.owners.items() if (role is None or meta["role"] == role)]:
 629			return True
 630
 631		# owners that are owner by being part of a tag
 632		if username in itertools.chain(*[tagged_owners for tag, tagged_owners in self.tagged_owners.items() if (role is None or self.owners[f"tag:{tag}"]["role"] == role)]):
 633			return True
 634
 635		return False
 636
 637	def get_owners_users(self, role="owner"):
 638		"""
 639		Get list of dataset owners
 640
 641		This returns a list of *users* that are considered owners. Tags are
 642		transparently replaced with the users with that tag.
 643
 644		:param str|None role:  Role to check for. If `None`, all owners are
 645		returned regardless of role.
 646
 647		:return set:  De-duplicated owner list
 648		"""
 649		# 'normal' owners
 650		owners = [owner for owner, meta in self.owners.items() if
 651				  (role is None or meta["role"] == role) and not owner.startswith("tag:")]
 652
 653		# owners that are owner by being part of a tag
 654		owners.extend(itertools.chain(*[tagged_owners for tag, tagged_owners in self.tagged_owners.items() if
 655									   role is None or self.owners[f"tag:{tag}"]["role"] == role]))
 656
 657		# de-duplicate before returning
 658		return set(owners)
 659
 660	def get_owners(self, role="owner"):
 661		"""
 662		Get list of dataset owners
 663
 664		This returns a list of all owners, and does not transparently resolve
 665		tags (like `get_owners_users` does).
 666
 667		:param str|None role:  Role to check for. If `None`, all owners are
 668		returned regardless of role.
 669
 670		:return set:  De-duplicated owner list
 671		"""
 672		return [owner for owner, meta in self.owners.items() if (role is None or meta["role"] == role)]
 673
 674	def add_owner(self, username, role="owner"):
 675		"""
 676		Set dataset owner
 677
 678		If the user is already an owner, but with a different role, the role is
 679		updated. If the user is already an owner with the same role, nothing happens.
 680
 681		:param str|User username:  Username to set as owner
 682		:param str|None role:  Role to add user with.
 683		"""
 684		if type(username) is not str:
 685			if hasattr(username, "get_id"):
 686				username = username.get_id()
 687			else:
 688				raise TypeError("User must be a str or User object")
 689
 690		if username not in self.owners:
 691			self.owners[username] = {
 692				"name": username,
 693				"key": self.key,
 694				"role": role
 695			}
 696			self.db.insert("datasets_owners", data=self.owners[username], safe=True)
 697
 698		elif username in self.owners and self.owners[username]["role"] != role:
 699			self.db.update("datasets_owners", data={"role": role}, where={"name": username, "key": self.key})
 700			self.owners[username]["role"] = role
 701
 702		if username.startswith("tag:"):
 703			# this is a bit more complicated than just adding to the list of
 704			# owners, so do a full refresh
 705			self.refresh_owners()
 706
 707		# make sure children's owners remain in sync
 708		for child in self.children:
 709			child.add_owner(username, role)
 710			# not recursive, since we're calling it from recursive code!
 711			child.copy_ownership_from(self, recursive=False)
 712
 713	def remove_owner(self, username):
 714		"""
 715		Remove dataset owner
 716
 717		If no owner is set, the dataset is assigned to the anonymous user.
 718		If the user is not an owner, nothing happens.
 719
 720		:param str|User username:  Username to set as owner
 721		"""
 722		if type(username) is not str:
 723			if hasattr(username, "get_id"):
 724				username = username.get_id()
 725			else:
 726				raise TypeError("User must be a str or User object")
 727
 728		if username in self.owners:
 729			del self.owners[username]
 730			self.db.delete("datasets_owners", where={"name": username, "key": self.key})
 731
 732			if not self.owners:
 733				self.add_owner("anonymous")
 734
 735		if username in self.tagged_owners:
 736			del self.tagged_owners[username]
 737
 738		# make sure children's owners remain in sync
 739		for child in self.children:
 740			child.remove_owner(username)
 741			# not recursive, since we're calling it from recursive code!
 742			child.copy_ownership_from(self, recursive=False)
 743
 744	def refresh_owners(self):
 745		"""
 746		Update internal owner cache
 747
 748		This makes sure that the list of *users* and *tags* which can access the
 749		dataset is up to date.
 750		"""
 751		self.owners = {owner["name"]: owner for owner in self.db.fetchall("SELECT * FROM datasets_owners WHERE key = %s", (self.key,))}
 752
 753		# determine which users (if any) are owners of the dataset by having a
 754		# tag that is listed as an owner
 755		owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")]
 756		if owner_tags:
 757			tagged_owners = self.db.fetchall("SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,))
 758			self.tagged_owners = {
 759				owner_tag: [user["name"] for user in tagged_owners if owner_tag in user["tags"]]
 760				for owner_tag in owner_tags
 761			}
 762		else:
 763			self.tagged_owners = {}
 764
 765	def copy_ownership_from(self, dataset, recursive=True):
 766		"""
 767		Copy ownership
 768
 769		This is useful to e.g. make sure a dataset's ownership stays in sync
 770		with its parent
 771
 772		:param Dataset dataset:  Parent to copy from
 773		:return:
 774		"""
 775		self.db.delete("datasets_owners", where={"key": self.key}, commit=False)
 776
 777		for role in ("owner", "viewer"):
 778			owners = dataset.get_owners(role=role)
 779			for owner in owners:
 780				self.db.insert("datasets_owners", data={"key": self.key, "name": owner, "role": role}, commit=False, safe=True)
 781
 782		self.db.commit()
 783		if recursive:
 784			for child in self.children:
 785				child.copy_ownership_from(self, recursive=recursive)
 786
 787	def get_parameters(self):
 788		"""
 789		Get dataset parameters
 790
 791		The dataset parameters are stored as JSON in the database - parse them
 792		and return the resulting object
 793
 794		:return:  Dataset parameters as originally stored
 795		"""
 796		try:
 797			return json.loads(self.data["parameters"])
 798		except json.JSONDecodeError:
 799			return {}
 800
 801	def get_columns(self):
 802		"""
 803		Returns the dataset columns.
 804
 805		Useful for processor input forms. Can deal with both CSV and NDJSON
 806		files, the latter only if a `map_item` function is available in the
 807		processor that generated it. While in other cases one could use the
 808		keys of the JSON object, this is not always possible in follow-up code
 809		that uses the 'column' names, so for consistency this function acts as
 810		if no column can be parsed if no `map_item` function exists.
 811
 812		:return list:  List of dataset columns; empty list if unable to parse
 813		"""
 814		if not self.get_results_path().exists():
 815			# no file to get columns from
 816			return []
 817
 818		if (self.get_results_path().suffix.lower() == ".csv") or (self.get_results_path().suffix.lower() == ".ndjson" and self.get_own_processor() is not None and self.get_own_processor().map_item_method_available(dataset=self)):
 819			items = self.iterate_items(warn_unmappable=False)
 820			try:
 821				keys = list(items.__next__().keys())
 822			except (StopIteration, NotImplementedError):
 823				# No items or otherwise unable to iterate
 824				return []
 825			finally:
 826				del items
 827			return keys
 828		else:
 829			# Filetype not CSV or an NDJSON with `map_item`
 830			return []
 831
 832	def get_annotation_fields(self):
 833		"""
 834		Retrieves the saved annotation fields for this dataset.
 835		:return dict: The saved annotation fields.
 836		"""
 837
 838		annotation_fields = self.db.fetchone("SELECT annotation_fields FROM datasets WHERE key = %s;", (self.top_parent().key,))
 839		
 840		if annotation_fields and annotation_fields.get("annotation_fields"):
 841			annotation_fields = json.loads(annotation_fields["annotation_fields"])
 842		else:
 843			annotation_fields = {}
 844
 845		return annotation_fields
 846
 847	def get_annotations(self):
 848		"""
 849		Retrieves the annotations for this dataset.
 850		return dict: The annotations
 851		"""
 852
 853		annotations = self.db.fetchone("SELECT annotations FROM annotations WHERE key = %s;", (self.top_parent().key,))
 854
 855		if annotations and annotations.get("annotations"):
 856			return json.loads(annotations["annotations"])
 857		else:
 858			return None
 859
 860	def update_label(self, label):
 861		"""
 862		Update label for this dataset
 863
 864		:param str label:  New label
 865		:return str:  The new label, as returned by get_label
 866		"""
 867		self.parameters["label"] = label
 868
 869		self.db.update("datasets", data={"parameters": json.dumps(self.parameters)}, where={"key": self.key})
 870		return self.get_label()
 871
 872	def get_label(self, parameters=None, default="Query"):
 873		"""
 874		Generate a readable label for the dataset
 875
 876		:param dict parameters:  Parameters of the dataset
 877		:param str default:  Label to use if it cannot be inferred from the
 878		parameters
 879
 880		:return str:  Label
 881		"""
 882		if not parameters:
 883			parameters = self.parameters
 884
 885		if parameters.get("label"):
 886			return parameters["label"]
 887		elif parameters.get("body_query") and parameters["body_query"] != "empty":
 888			return parameters["body_query"]
 889		elif parameters.get("body_match") and parameters["body_match"] != "empty":
 890			return parameters["body_match"]
 891		elif parameters.get("subject_query") and parameters["subject_query"] != "empty":
 892			return parameters["subject_query"]
 893		elif parameters.get("subject_match") and parameters["subject_match"] != "empty":
 894			return parameters["subject_match"]
 895		elif parameters.get("query"):
 896			label = parameters["query"]
 897			# Some legacy datasets have lists as query data
 898			if isinstance(label, list):
 899				label = ", ".join(label)
 900
 901			label = label if len(label) < 30 else label[:25] + "..."
 902			label = label.strip().replace("\n", ", ")
 903			return label
 904		elif parameters.get("country_flag") and parameters["country_flag"] != "all":
 905			return "Flag: %s" % parameters["country_flag"]
 906		elif parameters.get("country_name") and parameters["country_name"] != "all":
 907			return "Country: %s" % parameters["country_name"]
 908		elif parameters.get("filename"):
 909			return parameters["filename"]
 910		elif parameters.get("board") and "datasource" in parameters:
 911			return parameters["datasource"] + "/" + parameters["board"]
 912		elif "datasource" in parameters and parameters["datasource"] in self.modules.datasources:
 913			return self.modules.datasources[parameters["datasource"]]["name"] + " Dataset"
 914		else:
 915			return default
 916
 917	def change_datasource(self, datasource):
 918		"""
 919		Change the datasource type for this dataset
 920
 921		:param str label:  New datasource type
 922		:return str:  The new datasource type
 923		"""
 924
 925		self.parameters["datasource"] = datasource
 926
 927		self.db.update("datasets", data={"parameters": json.dumps(self.parameters)}, where={"key": self.key})
 928		return datasource
 929
 930	def reserve_result_file(self, parameters=None, extension="csv"):
 931		"""
 932		Generate a unique path to the results file for this dataset
 933
 934		This generates a file name for the data file of this dataset, and makes sure
 935		no file exists or will exist at that location other than the file we
 936		expect (i.e. the data for this particular dataset).
 937
 938		:param str extension: File extension, "csv" by default
 939		:param parameters:  Dataset parameters
 940		:return bool:  Whether the file path was successfully reserved
 941		"""
 942		if self.data["is_finished"]:
 943			raise RuntimeError("Cannot reserve results file for a finished dataset")
 944
 945		# Use 'random' for random post queries
 946		if "random_amount" in parameters and int(parameters["random_amount"]) > 0:
 947			file = 'random-' + str(parameters["random_amount"]) + '-' + self.data["key"]
 948		# Use country code for country flag queries
 949		elif "country_flag" in parameters and parameters["country_flag"] != 'all':
 950			file = 'countryflag-' + str(parameters["country_flag"]) + '-' + self.data["key"]
 951		# Use the query string for all other queries
 952		else:
 953			query_bit = self.data["query"].replace(" ", "-").lower()
 954			query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit)
 955			query_bit = query_bit[:100]  # Crop to avoid OSError
 956			file = query_bit + "-" + self.data["key"]
 957			file = re.sub(r"[-]+", "-", file)
 958
 959		path = self.folder.joinpath(file + "." + extension.lower())
 960		index = 1
 961		while path.is_file():
 962			path = self.folder.joinpath(file + "-" + str(index) + "." + extension.lower())
 963			index += 1
 964
 965		file = path.name
 966		updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]},
 967								 data={"result_file": file})
 968		self.data["result_file"] = file
 969		return updated > 0
 970
 971	def get_key(self, query, parameters, parent="", time_offset=0):
 972		"""
 973		Generate a unique key for this dataset that can be used to identify it
 974
 975		The key is a hash of a combination of the query string and parameters.
 976		You never need to call this, really: it's used internally.
 977
 978		:param str query:  Query string
 979		:param parameters:  Dataset parameters
 980		:param parent: Parent dataset's key (if applicable)
 981		:param time_offset:  Offset to add to the time component of the dataset
 982		key. This can be used to ensure a unique key even if the parameters and
 983		timing is otherwise identical to an existing dataset's
 984
 985		:return str:  Dataset key
 986		"""
 987		# Return a hash based on parameters
 988		# we're going to use the hash of the parameters to uniquely identify
 989		# the dataset, so make sure it's always in the same order, or we might
 990		# end up creating multiple keys for the same dataset if python
 991		# decides to return the dict in a different order
 992		param_key = collections.OrderedDict()
 993		for key in sorted(parameters):
 994			param_key[key] = parameters[key]
 995
 996		# we additionally use the current time as a salt - this should usually
 997		# ensure a unique key for the dataset. if for some reason there is a
 998		# hash collision
 999		param_key["_salt"] = int(time.time()) + time_offset
1000
1001		parent_key = str(parent) if parent else ""
1002		plain_key = repr(param_key) + str(query) + parent_key
1003		hashed_key = hashlib.md5(plain_key.encode("utf-8")).hexdigest()
1004
1005		if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)):
1006			# key exists, generate a new one
1007			return self.get_key(query, parameters, parent, time_offset=random.randint(1,10))
1008		else:
1009			return hashed_key
1010
1011	def set_key(self, key):
1012		"""
1013		Change dataset key
1014
1015		In principe, keys should never be changed. But there are rare cases
1016		where it is useful to do so, in particular when importing a dataset
1017		from another 4CAT instance; in that case it makes sense to try and
1018		ensure that the key is the same as it was before. This function sets
1019		the dataset key and updates any dataset references to it.
1020
1021		:param str key:  Key to set
1022		:return str:  Key that was set. If the desired key already exists, the
1023		original key is kept.
1024		"""
1025		key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,))
1026		if key_exists or not key:
1027			return self.key
1028
1029		old_key = self.key
1030		self.db.update("datasets", data={"key": key}, where={"key": old_key})
1031
1032		# update references
1033		self.db.update("datasets", data={"key_parent": key}, where={"key_parent": old_key})
1034		self.db.update("datasets_owners", data={"key": key}, where={"key": old_key})
1035		self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key})
1036		self.db.update("users_favourites", data={"key": key}, where={"key": old_key})
1037
1038		# for good measure
1039		self.db.commit()
1040		self.key = key
1041
1042		return self.key
1043
1044	def get_status(self):
1045		"""
1046		Get Dataset status
1047
1048		:return string: Dataset status
1049		"""
1050		return self.data["status"]
1051
1052	def update_status(self, status, is_final=False):
1053		"""
1054		Update dataset status
1055
1056		The status is a string that may be displayed to a user to keep them
1057		updated and informed about the progress of a dataset. No memory is kept
1058		of earlier dataset statuses; the current status is overwritten when
1059		updated.
1060
1061		Statuses are also written to the dataset log file.
1062
1063		:param string status:  Dataset status
1064		:param bool is_final:  If this is `True`, subsequent calls to this
1065		method while the object is instantiated will not update the dataset
1066		status.
1067		:return bool:  Status update successful?
1068		"""
1069		if self.no_status_updates:
1070			return
1071
1072		# for presets, copy the updated status to the preset(s) this is part of
1073		if self.preset_parent is None:
1074			self.preset_parent = [parent for parent in self.get_genealogy() if parent.type.find("preset-") == 0 and parent.key != self.key][:1]
1075
1076		if self.preset_parent:
1077			for preset_parent in self.preset_parent:
1078				if not preset_parent.is_finished():
1079					preset_parent.update_status(status)
1080
1081		self.data["status"] = status
1082		updated = self.db.update("datasets", where={"key": self.data["key"]}, data={"status": status})
1083
1084		if is_final:
1085			self.no_status_updates = True
1086
1087		self.log(status)
1088
1089		return updated > 0
1090
1091	def update_progress(self, progress):
1092		"""
1093		Update dataset progress
1094
1095		The progress can be used to indicate to a user how close the dataset
1096		is to completion.
1097
1098		:param float progress:  Between 0 and 1.
1099		:return:
1100		"""
1101		progress = min(1, max(0, progress))  # clamp
1102		if type(progress) is int:
1103			progress = float(progress)
1104
1105		self.data["progress"] = progress
1106		updated = self.db.update("datasets", where={"key": self.data["key"]}, data={"progress": progress})
1107		return updated > 0
1108
1109	def get_progress(self):
1110		"""
1111		Get dataset progress
1112
1113		:return float:  Progress, between 0 and 1
1114		"""
1115		return self.data["progress"]
1116
1117	def finish_with_error(self, error):
1118		"""
1119		Set error as final status, and finish with 0 results
1120
1121		This is a convenience function to avoid having to repeat
1122		"update_status" and "finish" a lot.
1123
1124		:param str error:  Error message for final dataset status.
1125		:return:
1126		"""
1127		self.update_status(error, is_final=True)
1128		self.finish(0)
1129
1130		return None
1131
1132	def update_version(self, version):
1133		"""
1134		Update software version used for this dataset
1135
1136		This can be used to verify the code that was used to process this dataset.
1137
1138		:param string version:  Version identifier
1139		:return bool:  Update successul?
1140		"""
1141		try:
1142			# this fails if the processor type is unknown
1143			# edge case, but let's not crash...
1144			processor_path = self.modules.processors.get(self.data["type"]).filepath
1145		except AttributeError:
1146			processor_path = ""
1147
1148		updated = self.db.update("datasets", where={"key": self.data["key"]}, data={
1149			"software_version": version[0],
1150			"software_source": version[1],
1151			"software_file": processor_path
1152		})
1153
1154		return updated > 0
1155
1156	def delete_parameter(self, parameter, instant=True):
1157		"""
1158		Delete a parameter from the dataset metadata
1159
1160		:param string parameter:  Parameter to delete
1161		:param bool instant:  Also delete parameters in this instance object?
1162		:return bool:  Update successul?
1163		"""
1164		parameters = self.parameters.copy()
1165		if parameter in parameters:
1166			del parameters[parameter]
1167		else:
1168			return False
1169
1170		updated = self.db.update("datasets", where={"key": self.data["key"]},
1171								 data={"parameters": json.dumps(parameters)})
1172
1173		if instant:
1174			self.parameters = parameters
1175
1176		return updated > 0
1177
1178	def get_version_url(self, file):
1179		"""
1180		Get a versioned github URL for the version this dataset was processed with
1181
1182		:param file:  File to link within the repository
1183		:return:  URL, or an empty string
1184		"""
1185		if not self.data["software_source"]:
1186			return ""
1187
1188		filepath = self.data.get("software_file", "")
1189		if filepath.startswith("/extensions/"):
1190			# go to root of extension
1191			filepath = "/" + "/".join(filepath.split("/")[3:])
1192
1193		return self.data["software_source"] + "/blob/" + self.data["software_version"] + filepath
1194
1195	def top_parent(self):
1196		"""
1197		Get root dataset
1198
1199		Traverses the tree of datasets this one is part of until it finds one
1200		with no source_dataset dataset, then returns that dataset.
1201
1202		:return Dataset: Parent dataset
1203		"""
1204		genealogy = self.get_genealogy()
1205		return genealogy[0]
1206
1207	def get_genealogy(self, inclusive=False):
1208		"""
1209		Get genealogy of this dataset
1210
1211		Creates a list of DataSet objects, with the first one being the
1212		'top' dataset, and each subsequent one being a child of the previous
1213		one, ending with the current dataset.
1214
1215		:return list:  Dataset genealogy, oldest dataset first
1216		"""
1217		if self.genealogy and not inclusive:
1218			return self.genealogy
1219
1220		key_parent = self.key_parent
1221		genealogy = []
1222
1223		while key_parent:
1224			try:
1225				parent = DataSet(key=key_parent, db=self.db, modules=self.modules)
1226			except DataSetException:
1227				break
1228
1229			genealogy.append(parent)
1230			if parent.key_parent:
1231				key_parent = parent.key_parent
1232			else:
1233				break
1234
1235		genealogy.reverse()
1236		genealogy.append(self)
1237
1238		self.genealogy = genealogy
1239		return self.genealogy
1240
1241	def get_all_children(self, recursive=True):
1242		"""
1243		Get all children of this dataset
1244
1245		Results are returned as a non-hierarchical list, i.e. the result does
1246		not reflect the actual dataset hierarchy (but all datasets in the
1247		result will have the original dataset as an ancestor somewhere)
1248
1249		:return list:  List of DataSets
1250		"""
1251		children = [DataSet(data=record, db=self.db, modules=self.modules) for record in self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,))]
1252		results = children.copy()
1253		if recursive:
1254			for child in children:
1255				results += child.get_all_children(recursive)
1256
1257		return results
1258
1259	def nearest(self, type_filter):
1260		"""
1261		Return nearest dataset that matches the given type
1262
1263		Starting with this dataset, traverse the hierarchy upwards and return
1264		whichever dataset matches the given type.
1265
1266		:param str type_filter:  Type filter. Can contain wildcards and is matched
1267		using `fnmatch.fnmatch`.
1268		:return:  Earliest matching dataset, or `None` if none match.
1269		"""
1270		genealogy = self.get_genealogy(inclusive=True)
1271		for dataset in reversed(genealogy):
1272			if fnmatch.fnmatch(dataset.type, type_filter):
1273				return dataset
1274
1275		return None
1276
1277	def get_breadcrumbs(self):
1278		"""
1279		Get breadcrumbs navlink for use in permalinks
1280
1281		Returns a string representing this dataset's genealogy that may be used
1282		to uniquely identify it.
1283
1284		:return str: Nav link
1285		"""
1286		if self.genealogy:
1287			return ",".join([dataset.key for dataset in self.genealogy])
1288		else:
1289			# Collect keys only
1290			key_parent = self.key  # Start at the bottom
1291			genealogy = []
1292
1293			while key_parent:
1294				try:
1295					parent = self.db.fetchone("SELECT key_parent FROM datasets WHERE key = %s", (key_parent,))
1296				except TypeError:
1297					break
1298
1299				key_parent = parent["key_parent"]
1300				if key_parent:
1301					genealogy.append(key_parent)
1302				else:
1303					break
1304
1305			genealogy.reverse()
1306			genealogy.append(self.key)
1307			return ",".join(genealogy)
1308
1309	def get_compatible_processors(self, user=None):
1310		"""
1311		Get list of processors compatible with this dataset
1312
1313		Checks whether this dataset type is one that is listed as being accepted
1314		by the processor, for each known type: if the processor does not
1315		specify accepted types (via the `is_compatible_with` method), it is
1316		assumed it accepts any top-level datasets
1317
1318		:param str|User|None user:  User to get compatibility for. If set,
1319		use the user-specific config settings where available.
1320
1321		:return dict:  Compatible processors, `name => class` mapping
1322		"""
1323		processors = self.modules.processors
1324
1325		available = {}
1326		for processor_type, processor in processors.items():
1327			if processor.is_from_collector():
1328				continue
1329
1330			own_processor = self.get_own_processor()
1331			if own_processor and own_processor.exclude_followup_processors(processor_type):
1332				continue
1333
1334			# consider a processor compatible if its is_compatible_with
1335			# method returns True *or* if it has no explicit compatibility
1336			# check and this dataset is top-level (i.e. has no parent)
1337			if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \
1338					or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, user=user)):
1339				available[processor_type] = processor
1340
1341		return available
1342
1343	def get_place_in_queue(self, update=False):
1344		"""
1345		Determine dataset's position in queue
1346
1347		If the dataset is already finished, the position is -1. Else, the
1348		position is the amount of datasets to be completed before this one will
1349		be processed. A position of 0 would mean that the dataset is currently
1350		being executed, or that the backend is not running.
1351
1352		:param bool update:  Update the queue position from database if True, else return cached value
1353		:return int:  Queue position
1354		"""
1355		if self.is_finished() or not self.data.get("job"):
1356			self._queue_position = -1
1357			return self._queue_position
1358		elif not update and self._queue_position is not None:
1359			# Use cached value
1360			return self._queue_position
1361		else:
1362			# Collect queue position from database via the job
1363			try:
1364				job = Job.get_by_ID(self.data["job"], self.db)
1365				self._queue_position = job.get_place_in_queue()
1366			except JobNotFoundException:
1367				self._queue_position = -1
1368
1369			return self._queue_position
1370
1371	def get_modules(self):
1372		"""
1373		Get 4CAT modules
1374
1375		Is a function because loading them is not free, and this way we can
1376		cache the result.
1377
1378		:return:
1379		"""
1380		if not self.modules:
1381			self.modules = ModuleCollector()
1382
1383		return self.modules
1384
1385	def get_own_processor(self):
1386		"""
1387		Get the processor class that produced this dataset
1388
1389		:return:  Processor class, or `None` if not available.
1390		"""
1391		processor_type = self.parameters.get("type", self.data.get("type"))
1392
1393		return self.modules.processors.get(processor_type)
1394
1395	def get_available_processors(self, user=None, exclude_hidden=False):
1396		"""
1397		Get list of processors that may be run for this dataset
1398
1399		Returns all compatible processors except for those that are already
1400		queued or finished and have no options. Processors that have been
1401		run but have options are included so they may be run again with a
1402		different configuration
1403
1404		:param str|User|None user:  User to get compatibility for. If set,
1405		use the user-specific config settings where available.
1406		:param bool exclude_hidden:  Exclude processors that should be displayed
1407		in the UI? If `False`, all processors are returned.
1408
1409		:return dict:  Available processors, `name => properties` mapping
1410		"""
1411		if self.available_processors:
1412			# Update to reflect exclude_hidden parameter which may be different from last call
1413			# TODO: could children also have been created? Possible bug, but I have not seen anything effected by this
1414			return {processor_type: processor for processor_type, processor in self.available_processors.items() if not exclude_hidden or not processor.is_hidden}
1415
1416		processors = self.get_compatible_processors(user=user)
1417
1418		for analysis in self.children:
1419			if analysis.type not in processors:
1420				continue
1421
1422			if not processors[analysis.type].get_options():
1423				del processors[analysis.type]
1424				continue
1425
1426			if exclude_hidden and processors[analysis.type].is_hidden:
1427				del processors[analysis.type]
1428
1429		self.available_processors = processors
1430		return processors
1431
1432	def link_job(self, job):
1433		"""
1434		Link this dataset to a job ID
1435
1436		Updates the dataset data to include a reference to the job that will be
1437		executing (or has already executed) this job.
1438
1439		Note that if no job can be found for this dataset, this method silently
1440		fails.
1441
1442		:param Job job:  The job that will run this dataset
1443
1444		:todo: If the job column ever gets used, make sure it always contains
1445		       a valid value, rather than silently failing this method.
1446		"""
1447		if type(job) != Job:
1448			raise TypeError("link_job requires a Job object as its argument")
1449
1450		if "id" not in job.data:
1451			try:
1452				job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"])
1453			except JobNotFoundException:
1454				return
1455
1456		self.db.update("datasets", where={"key": self.key}, data={"job": job.data["id"]})
1457
1458	def link_parent(self, key_parent):
1459		"""
1460		Set source_dataset key for this dataset
1461
1462		:param key_parent:  Parent key. Not checked for validity
1463		"""
1464		self.db.update("datasets", where={"key": self.key}, data={"key_parent": key_parent})
1465
1466	def get_parent(self):
1467		"""
1468		Get parent dataset
1469
1470		:return DataSet:  Parent dataset, or `None` if not applicable
1471		"""
1472		return DataSet(key=self.key_parent, db=self.db, modules=self.modules) if self.key_parent else None
1473
1474	def detach(self):
1475		"""
1476		Makes the datasets standalone, i.e. not having any source_dataset dataset
1477		"""
1478		self.link_parent("")
1479
1480	def is_dataset(self):
1481		"""
1482		Easy way to confirm this is a dataset.
1483		Used for checking processor and dataset compatibility,
1484		which needs to handle both processors and datasets.
1485		"""
1486		return True
1487
1488	def is_top_dataset(self):
1489		"""
1490		Easy way to confirm this is a top dataset.
1491		Used for checking processor and dataset compatibility,
1492		which needs to handle both processors and datasets.
1493		"""
1494		if self.key_parent:
1495			return False
1496		return True
1497
1498	def is_expiring(self, user=None):
1499		"""
1500		Determine if dataset is set to expire
1501
1502		Similar to `is_expired`, but checks if the dataset will be deleted in
1503		the future, not if it should be deleted right now.
1504
1505		:param user:  User to use for configuration context. Provide to make
1506		sure configuration overrides for this user are taken into account.
1507		:return bool|int:  `False`, or the expiration date as a Unix timestamp.
1508		"""
1509		# has someone opted out of deleting this?
1510		if self.parameters.get("keep"):
1511			return False
1512
1513		# is this dataset explicitly marked as expiring after a certain time?
1514		if self.parameters.get("expires-after"):
1515			return self.parameters.get("expires-after")
1516
1517		# is the data source configured to have its datasets expire?
1518		expiration = config.get("datasources.expiration", {}, user=user)
1519		if not expiration.get(self.parameters.get("datasource")):
1520			return False
1521
1522		# is there a timeout for this data source?
1523		if expiration.get(self.parameters.get("datasource")).get("timeout"):
1524			return self.timestamp + expiration.get(self.parameters.get("datasource")).get("timeout")
1525
1526		return False
1527
1528	def is_expired(self, user=None):
1529		"""
1530		Determine if dataset should be deleted
1531
1532		Datasets can be set to expire, but when they should be deleted depends
1533		on a number of factor. This checks them all.
1534
1535		:param user:  User to use for configuration context. Provide to make
1536		sure configuration overrides for this user are taken into account.
1537		:return bool:
1538		"""
1539		# has someone opted out of deleting this?
1540		if not self.is_expiring():
1541			return False
1542
1543		# is this dataset explicitly marked as expiring after a certain time?
1544		future = time.time() + 3600  # ensure we don't delete datasets with invalid expiration times
1545		if self.parameters.get("expires-after") and convert_to_int(self.parameters["expires-after"], future) < time.time():
1546			return True
1547
1548		# is the data source configured to have its datasets expire?
1549		expiration = config.get("datasources.expiration", {}, user=user)
1550		if not expiration.get(self.parameters.get("datasource")):
1551			return False
1552
1553		# is the dataset older than the set timeout?
1554		if expiration.get(self.parameters.get("datasource")).get("timeout"):
1555			return self.timestamp + expiration[self.parameters.get("datasource")]["timeout"] < time.time()
1556
1557		return False
1558
1559	def is_from_collector(self):
1560		"""
1561		Check if this dataset was made by a processor that collects data, i.e.
1562		a search or import worker.
1563
1564		:return bool:
1565		"""
1566		return self.type.endswith("-search") or self.type.endswith("-import")
1567
1568	def get_extension(self):
1569		"""
1570		Gets the file extention this dataset produces.
1571		Also checks whether the results file exists.
1572		Used for checking processor and dataset compatibility.
1573
1574		:return str extension:  Extension, e.g. `csv`
1575		"""
1576		if self.get_results_path().exists():
1577			return self.get_results_path().suffix[1:]
1578
1579		return False
1580
1581	def get_media_type(self):
1582		"""
1583		Gets the media type of the dataset file.
1584
1585		:return str: media type, e.g., "text"
1586		"""
1587		own_processor = self.get_own_processor()
1588		if hasattr(self, "media_type"):
1589			# media type can be defined explicitly in the dataset; this is the priority
1590			return self.media_type
1591		elif own_processor is not None:
1592			# or media type can be defined in the processor
1593			# some processors can set different media types for different datasets (e.g., import_media)
1594			if hasattr(own_processor, "media_type"):
1595				return own_processor.media_type
1596
1597		# Default to text
1598		return self.parameters.get("media_type", "text")
1599
1600	def get_metadata(self):
1601		"""
1602		Get dataset metadata
1603
1604		This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended
1605		as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to
1606		update its database (and ensure compatibility with the exporting version of 4CAT).
1607		"""
1608		metadata = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (self.key,))
1609
1610		# get 4CAT version (presumably to ensure export is compatible with import)
1611		metadata["current_4CAT_version"] = get_software_version()
1612		return metadata
1613
1614	def get_result_url(self):
1615		"""
1616		Gets the 4CAT frontend URL of a dataset file.
1617
1618		Uses the FlaskConfig attributes (i.e., SERVER_NAME and
1619		SERVER_HTTPS) plus hardcoded '/result/'.
1620		TODO: create more dynamic method of obtaining url.
1621		"""
1622		filename = self.get_results_path().name
1623		url_to_file = ('https://' if config.get("flask.https") else 'http://') + \
1624						config.get("flask.server_name") + '/result/' + filename
1625		return url_to_file
1626
1627	def warn_unmappable_item(self, item_count, processor=None, error_message=None, warn_admins=True):
1628		"""
1629		Log an item that is unable to be mapped and warn administrators.
1630
1631		:param int item_count:			Item index
1632		:param Processor processor:		Processor calling function8
1633		"""
1634		dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}"
1635
1636		# Use processing dataset if available, otherwise use original dataset (which likely already has this error message)
1637		closest_dataset = processor.dataset if processor is not None and processor.dataset is not None else self
1638		# Log error to dataset log
1639		closest_dataset.log(dataset_error_message)
1640
1641		if warn_admins:
1642			if processor is not None:
1643				processor.log.warning(f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}.")
1644			elif hasattr(self.db, "log"):
1645				# borrow the database's log handler
1646				self.db.log.warning(f"Unable to map item all items for dataset {closest_dataset.key}.")
1647			else:
1648				# No other log available
1649				raise DataSetException(f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn")
1650
1651	def __getattr__(self, attr):
1652		"""
1653		Getter so we don't have to use .data all the time
1654
1655		:param attr:  Data key to get
1656		:return:  Value
1657		"""
1658
1659		if attr in dir(self):
1660			# an explicitly defined attribute should always be called in favour
1661			# of this passthrough
1662			attribute = getattr(self, attr)
1663			return attribute
1664		elif attr in self.data:
1665			return self.data[attr]
1666		else:
1667			raise AttributeError("DataSet instance has no attribute %s" % attr)
1668
1669	def __setattr__(self, attr, value):
1670		"""
1671		Setter so we can flexibly update the database
1672
1673		Also updates internal data stores (.data etc). If the attribute is
1674		unknown, it is stored within the 'parameters' attribute.
1675
1676		:param str attr:  Attribute to update
1677		:param value:  New value
1678		"""
1679
1680		# don't override behaviour for *actual* class attributes
1681		if attr in dir(self):
1682			super().__setattr__(attr, value)
1683			return
1684
1685		if attr not in self.data:
1686			self.parameters[attr] = value
1687			attr = "parameters"
1688			value = self.parameters
1689
1690		if attr == "parameters":
1691			value = json.dumps(value)
1692
1693		self.db.update("datasets", where={"key": self.key}, data={attr: value})
1694
1695		self.data[attr] = value
1696
1697		if attr == "parameters":
1698			self.parameters = json.loads(value)
class DataSet(common.lib.fourcat_module.FourcatModule):
  26class DataSet(FourcatModule):
  27	"""
  28	Provide interface to safely register and run operations on a dataset
  29
  30	A dataset is a collection of:
  31	- A unique identifier
  32	- A set of parameters that demarcate the data contained within
  33	- The data
  34
  35	The data is usually stored in a file on the disk; the parameters are stored
  36	in a database. The handling of the data, et cetera, is done by other
  37	workers; this class defines method to create and manipulate the dataset's
  38	properties.
  39	"""
  40	# Attributes must be created here to ensure getattr and setattr work properly
  41	data = None
  42	key = ""
  43
  44	children = None
  45	available_processors = None
  46	genealogy = None
  47	preset_parent = None
  48	parameters = None
  49	modules = None
  50
  51	owners = None
  52	tagged_owners = None
  53
  54	db = None
  55	folder = None
  56	is_new = True
  57
  58	no_status_updates = False
  59	staging_areas = None
  60	_queue_position = None
  61
  62	def __init__(self, parameters=None, key=None, job=None, data=None, db=None, parent='', extension=None,
  63				 type=None, is_private=True, owner="anonymous", modules=None):
  64		"""
  65		Create new dataset object
  66
  67		If the dataset is not in the database yet, it is added.
  68
  69		:param dict parameters:  Only when creating a new dataset. Dataset
  70		parameters, free-form dictionary.
  71		:param str key: Dataset key. If given, dataset with this key is loaded.
  72		:param int job: Job ID. If given, dataset corresponding to job is
  73		loaded.
  74		:param dict data: Dataset data, corresponding to a row in the datasets
  75		database table. If not given, retrieved from database depending on key.
  76		:param db:  Database connection
  77		:param str parent:  Only when creating a new dataset. Parent dataset
  78		key to which the one being created is a child.
  79		:param str extension: Only when creating a new dataset. Extension of
  80		dataset result file.
  81		:param str type: Only when creating a new dataset. Type of the dataset,
  82		corresponding to the type property of a processor class.
  83		:param bool is_private: Only when creating a new dataset. Whether the
  84		dataset is private or public.
  85		:param str owner: Only when creating a new dataset. The user name of
  86		the dataset's creator.
  87		:param modules: Module cache. If not given, will be loaded when needed
  88		(expensive). Used to figure out what processors are compatible with
  89		this dataset.
  90		"""
  91		self.db = db
  92		self.folder = config.get('PATH_ROOT').joinpath(config.get('PATH_DATA'))
  93		# Ensure mutable attributes are set in __init__ as they are unique to each DataSet
  94		self.data = {}
  95		self.parameters = {}
  96		self.children = []
  97		self.available_processors = {}
  98		self.genealogy = []
  99		self.staging_areas = []
 100		self.modules = modules
 101
 102		if key is not None:
 103			self.key = key
 104			current = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (self.key,))
 105			if not current:
 106				raise DataSetNotFoundException("DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given" % key)
 107
 108		elif job is not None:
 109			current = self.db.fetchone("SELECT * FROM datasets WHERE parameters::json->>'job' = %s", (job,))
 110			if not current:
 111				raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument")
 112
 113			self.key = current["key"]
 114		elif data is not None:
 115			current = data
 116			if "query" not in data or "key" not in data or "parameters" not in data or "key_parent" not in data:
 117				raise DataSetException("DataSet() requires a complete dataset record for its 'data' argument")
 118
 119			self.key = current["key"]
 120		else:
 121			if parameters is None:
 122				raise DataSetException("DataSet() requires either 'key', or 'parameters' to be given")
 123
 124			if not type:
 125				raise DataSetException("Datasets must have their type set explicitly")
 126
 127			query = self.get_label(parameters, default=type)
 128			self.key = self.get_key(query, parameters, parent)
 129			current = self.db.fetchone("SELECT * FROM datasets WHERE key = %s AND query = %s", (self.key, query))
 130
 131		if current:
 132			self.data = current
 133			self.parameters = json.loads(self.data["parameters"])
 134			self.is_new = False
 135		else:
 136			self.data = {"type": type}  # get_own_processor needs this
 137			own_processor = self.get_own_processor()
 138			version = get_software_commit(own_processor)
 139			self.data = {
 140				"key": self.key,
 141				"query": self.get_label(parameters, default=type),
 142				"parameters": json.dumps(parameters),
 143				"result_file": "",
 144				"creator": owner,
 145				"status": "",
 146				"type": type,
 147				"timestamp": int(time.time()),
 148				"is_finished": False,
 149				"is_private": is_private,
 150				"software_version": version[0],
 151				"software_source": version[1],
 152				"software_file": "",
 153				"num_rows": 0,
 154				"progress": 0.0,
 155				"key_parent": parent
 156			}
 157			self.parameters = parameters
 158
 159			self.db.insert("datasets", data=self.data)
 160			self.refresh_owners()
 161			self.add_owner(owner)
 162
 163			# Find desired extension from processor if not explicitly set
 164			if extension is None:
 165				if own_processor:
 166					extension = own_processor.get_extension(parent_dataset=DataSet(key=parent, db=db, modules=self.modules) if parent else None)
 167				# Still no extension, default to 'csv'
 168				if not extension:
 169					extension = "csv"
 170
 171			# Reserve filename and update data['result_file']
 172			self.reserve_result_file(parameters, extension)
 173
 174		# retrieve analyses and processors that may be run for this dataset
 175		analyses = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC", (self.key,))
 176		self.children = sorted([DataSet(data=analysis, db=self.db, modules=self.modules) for analysis in analyses],
 177							   key=lambda dataset: dataset.is_finished(), reverse=True)
 178
 179		self.refresh_owners()
 180
 181	def check_dataset_finished(self):
 182		"""
 183		Checks if dataset is finished. Returns path to results file is not empty,
 184		or 'empty_file' when there were not matches.
 185
 186		Only returns a path if the dataset is complete. In other words, if this
 187		method returns a path, a file with the complete results for this dataset
 188		will exist at that location.
 189
 190		:return: A path to the results file, 'empty_file', or `None`
 191		"""
 192		if self.data["is_finished"] and self.data["num_rows"] > 0:
 193			return self.folder.joinpath(self.data["result_file"])
 194		elif self.data["is_finished"] and self.data["num_rows"] == 0:
 195			return 'empty'
 196		else:
 197			return None
 198
 199	def get_results_path(self):
 200		"""
 201		Get path to results file
 202
 203		Always returns a path, that will at some point contain the dataset
 204		data, but may not do so yet. Use this to get the location to write
 205		generated results to.
 206
 207		:return Path:  A path to the results file
 208		"""
 209		return self.folder.joinpath(self.data["result_file"])
 210
 211	def get_results_folder_path(self):
 212		"""
 213		Get path to folder containing accompanying results
 214
 215		Returns a path that may not yet be created
 216
 217		:return Path:  A path to the results file
 218		"""
 219		return self.folder.joinpath("folder_" + self.key)
 220
 221	def get_log_path(self):
 222		"""
 223		Get path to dataset log file
 224
 225		Each dataset has a single log file that documents its creation. This
 226		method returns the path to that file. It is identical to the path of
 227		the dataset result file, with 'log' as its extension instead.
 228
 229		:return Path:  A path to the log file
 230		"""
 231		return self.get_results_path().with_suffix(".log")
 232
 233	def clear_log(self):
 234		"""
 235		Clears the dataset log file
 236
 237		If the log file does not exist, it is created empty. The log file will
 238		have the same file name as the dataset result file, with the 'log'
 239		extension.
 240		"""
 241		log_path = self.get_log_path()
 242		with log_path.open("w") as outfile:
 243			pass
 244
 245	def log(self, log):
 246		"""
 247		Write log message to file
 248
 249		Writes the log message to the log file on a new line, including a
 250		timestamp at the start of the line. Note that this assumes the log file
 251		already exists - it should have been created/cleared with clear_log()
 252		prior to calling this.
 253
 254		:param str log:  Log message to write
 255		"""
 256		log_path = self.get_log_path()
 257		with log_path.open("a", encoding="utf-8") as outfile:
 258			outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log))
 259
 260	def _iterate_items(self, processor=None):
 261		"""
 262		A generator that iterates through a CSV or NDJSON file
 263
 264		This is an internal method and should not be called directly. Rather,
 265		call iterate_items() and use the generated dictionary and its properties.
 266
 267		If a reference to a processor is provided, with every iteration,
 268		the processor's 'interrupted' flag is checked, and if set a
 269		ProcessorInterruptedException is raised, which by default is caught
 270		in the worker and subsequently stops execution gracefully.
 271
 272		There are two file types that can be iterated (currently): CSV files
 273		and NDJSON (newline-delimited JSON) files. In the future, one could
 274		envision adding a pathway to retrieve items from e.g. a MongoDB
 275		collection directly instead of from a static file
 276
 277		:param BasicProcessor processor:  A reference to the processor
 278		iterating the dataset.
 279		:return generator:  A generator that yields each item as a dictionary
 280		"""
 281		path = self.get_results_path()
 282
 283		# Yield through items one by one
 284		if path.suffix.lower() == ".csv":
 285			with path.open("rb") as infile:
 286				wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8")
 287				reader = csv.DictReader(wrapped_infile)
 288
 289				if not self.get_own_processor():
 290					# Processor was deprecated or removed; CSV file is likely readable but some legacy types are not
 291					first_item = next(reader)
 292					if first_item is None or any([True for key in first_item if type(key) is not str]):
 293						raise NotImplementedError(f"Cannot iterate through CSV file (deprecated processor {self.type})")
 294					yield first_item
 295
 296				for item in reader:
 297					if hasattr(processor, "interrupted") and processor.interrupted:
 298						raise ProcessorInterruptedException("Processor interrupted while iterating through CSV file")
 299
 300					yield item
 301
 302		elif path.suffix.lower() == ".ndjson":
 303			# In NDJSON format each line in the file is a self-contained JSON
 304			with path.open(encoding="utf-8") as infile:
 305				for line in infile:
 306					if hasattr(processor, "interrupted") and processor.interrupted:
 307						raise ProcessorInterruptedException("Processor interrupted while iterating through NDJSON file")
 308
 309					yield json.loads(line)
 310
 311		else:
 312			raise NotImplementedError("Cannot iterate through %s file" % path.suffix)
 313
 314	def iterate_items(self, processor=None, warn_unmappable=True, map_missing="default"):
 315		"""
 316		Generate mapped dataset items
 317
 318		Wrapper for _iterate_items that returns a DatasetItem, which can be
 319		accessed as a dict returning the original item or (if a mapper is
 320		available) the mapped item. Mapped or original versions of the item can
 321		also be accessed via the `original` and `mapped_object` properties of
 322		the DatasetItem.
 323
 324		Processors can define a method called `map_item` that can be used to map
 325		an item from the dataset file before it is processed any further. This is
 326		slower than storing the data file in the right format to begin with but
 327		not all data sources allow for easy 'flat' mapping of items, e.g. tweets
 328		are nested objects when retrieved from the twitter API that are easier
 329		to store as a JSON file than as a flat CSV file, and it would be a shame
 330		to throw away that data.
 331
 332		Note the two parameters warn_unmappable and map_missing. Items can be
 333		unmappable in that their structure is too different to coerce into a
 334		neat dictionary of the structure the data source expects. This makes it
 335		'unmappable' and warn_unmappable determines what happens in this case.
 336		It can also be of the right structure, but with some fields missing or
 337		incomplete. map_missing determines what happens in that case. The
 338		latter is for example possible when importing data via Zeeschuimer,
 339		which produces unstably-structured data captured from social media
 340		sites.
 341
 342		:param BasicProcessor processor:  A reference to the processor
 343		iterating the dataset.
 344		:param bool warn_unmappable:  If an item is not mappable, skip the item
 345		and log a warning
 346		:param map_missing: Indicates what to do with mapped items for which
 347		some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
 348		- 'default': fill missing fields with the default passed by map_item
 349		- 'abort': raise a MappedItemIncompleteException if a field is missing
 350		- a callback: replace missing field with the return value of the
 351		  callback. The MappedItem object is passed to the callback as the
 352		  first argument and the name of the missing field as the second.
 353		- a dictionary with a key for each possible missing field: replace missing
 354		  field with a strategy for that field ('default', 'abort', or a callback)
 355
 356		:return generator:  A generator that yields DatasetItems
 357		"""
 358		unmapped_items = False
 359		# Collect item_mapper for use with filter
 360		item_mapper = False
 361		own_processor = self.get_own_processor()
 362		if own_processor and own_processor.map_item_method_available(dataset=self):
 363			item_mapper = True
 364
 365		# missing field strategy can be for all fields at once, or per field
 366		# if it is per field, it is a dictionary with field names and their strategy
 367		# if it is for all fields, it is may be a callback, 'abort', or 'default'
 368		default_strategy = "default"
 369		if type(map_missing) is not dict:
 370			default_strategy = map_missing
 371			map_missing = {}
 372
 373		# Loop through items
 374		for i, item in enumerate(self._iterate_items(processor)):
 375			# Save original to yield
 376			original_item = item.copy()
 377
 378			# Map item
 379			if item_mapper:
 380				try:
 381					mapped_item = own_processor.get_mapped_item(item)
 382				except MapItemException as e:
 383					if warn_unmappable:
 384						self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False)
 385						unmapped_items = True
 386					continue
 387
 388				# check if fields have been marked as 'missing' in the
 389				# underlying data, and treat according to the chosen strategy
 390				if mapped_item.get_missing_fields():
 391					for missing_field in mapped_item.get_missing_fields():
 392						strategy = map_missing.get(missing_field, default_strategy)
 393
 394						if callable(strategy):
 395							# delegate handling to a callback
 396							mapped_item.data[missing_field] = strategy(mapped_item.data, missing_field)
 397						elif strategy == "abort":
 398							# raise an exception to be handled at the processor level
 399							raise MappedItemIncompleteException(f"Cannot process item, field {missing_field} missing in source data.")
 400						elif strategy == "default":
 401							# use whatever was passed to the object constructor
 402							mapped_item.data[missing_field] = mapped_item.data[missing_field].value
 403						else:
 404							raise ValueError("map_missing must be 'abort', 'default', or a callback.")
 405
 406			else:
 407				mapped_item = original_item
 408
 409			# yield a DatasetItem, which is a dict with some special properties
 410			yield DatasetItem(mapper=item_mapper, original=original_item, mapped_object=mapped_item, **(mapped_item.get_item_data() if type(mapped_item) is MappedItem else mapped_item))
 411
 412	def get_staging_area(self):
 413		"""
 414		Get path to a temporary folder in which files can be stored before
 415		finishing
 416
 417		This folder must be created before use, but is guaranteed to not exist
 418		yet. The folder may be used as a staging area for the dataset data
 419		while it is being processed.
 420
 421		:return Path:  Path to folder
 422		"""
 423		results_file = self.get_results_path()
 424
 425		results_dir_base = results_file.parent
 426		results_dir = results_file.name.replace(".", "") + "-staging"
 427		results_path = results_dir_base.joinpath(results_dir)
 428		index = 1
 429		while results_path.exists():
 430			results_path = results_dir_base.joinpath(results_dir + "-" + str(index))
 431			index += 1
 432
 433		# create temporary folder
 434		results_path.mkdir()
 435
 436		# Storing the staging area with the dataset so that it can be removed later
 437		self.staging_areas.append(results_path)
 438
 439		return results_path
 440
 441	def remove_staging_areas(self):
 442		"""
 443		Remove any staging areas that were created and all files contained in them.
 444		"""
 445		# Remove DataSet staging areas
 446		if self.staging_areas:
 447			for staging_area in self.staging_areas:
 448				if staging_area.is_dir():
 449					shutil.rmtree(staging_area)
 450
 451	def finish(self, num_rows=0):
 452		"""
 453		Declare the dataset finished
 454		"""
 455		if self.data["is_finished"]:
 456			raise RuntimeError("Cannot finish a finished dataset again")
 457
 458		self.db.update("datasets", where={"key": self.data["key"]},
 459					   data={"is_finished": True, "num_rows": num_rows, "progress": 1.0, "timestamp_finished": int(time.time())})
 460		self.data["is_finished"] = True
 461		self.data["num_rows"] = num_rows
 462
 463	def copy(self, shallow=True):
 464		"""
 465		Copies the dataset, making a new version with a unique key
 466
 467
 468		:param bool shallow:  Shallow copy: does not copy the result file, but
 469		instead refers to the same file as the original dataset did
 470		:return Dataset:  Copied dataset
 471		"""
 472		parameters = self.parameters.copy()
 473
 474		# a key is partially based on the parameters. so by setting these extra
 475		# attributes, we also ensure a unique key will be generated for the
 476		# copy
 477		# possibly todo: don't use time for uniqueness (but one shouldn't be
 478		# copying a dataset multiple times per microsecond, that's not what
 479		# this is for)
 480		parameters["copied_from"] = self.key
 481		parameters["copied_at"] = time.time()
 482
 483		copy = DataSet(parameters=parameters, db=self.db, extension=self.result_file.split(".")[-1], type=self.type, modules=self.modules)
 484		for field in self.data:
 485			if field in ("id", "key", "timestamp", "job", "parameters", "result_file"):
 486				continue
 487
 488			copy.__setattr__(field, self.data[field])
 489
 490		if shallow:
 491			# use the same result file
 492			copy.result_file = self.result_file
 493		else:
 494			# copy to new file with new key
 495			shutil.copy(self.get_results_path(), copy.get_results_path())
 496
 497		if self.is_finished():
 498			copy.finish(self.num_rows)
 499
 500		# make sure ownership is also copied
 501		copy.copy_ownership_from(self)
 502
 503		return copy
 504
 505	def delete(self, commit=True, queue=None):
 506		"""
 507		Delete the dataset, and all its children
 508
 509		Deletes both database records and result files. Note that manipulating
 510		a dataset object after it has been deleted is undefined behaviour.
 511
 512		:param bool commit:  Commit SQL DELETE query?
 513		"""
 514		# first, recursively delete children
 515		children = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,))
 516		for child in children:
 517			try:
 518				child = DataSet(key=child["key"], db=self.db, modules=self.modules)
 519				child.delete(commit=commit)
 520			except DataSetException:
 521				# dataset already deleted - race condition?
 522				pass
 523
 524		# delete any queued jobs for this dataset
 525		try:
 526			job = Job.get_by_remote_ID(self.key, self.db, self.type)
 527			if job.is_claimed:
 528				# tell API to stop any jobs running for this dataset
 529				# level 2 = cancel job
 530				# we're not interested in the result - if the API is available,
 531				# it will do its thing, if it's not the backend is probably not
 532				# running so the job also doesn't need to be interrupted
 533				call_api(
 534					"cancel-job",
 535					{"remote_id": self.key, "jobtype": self.type, "level": 2},
 536					False
 537				)
 538
 539			# this deletes the job from the database
 540			job.finish(True)
 541
 542		except JobNotFoundException:
 543			pass
 544
 545		# delete from database
 546		self.db.delete("datasets", where={"key": self.key}, commit=commit)
 547		self.db.delete("datasets_owners", where={"key": self.key}, commit=commit)
 548		self.db.delete("users_favourites", where={"key": self.key}, commit=commit)
 549
 550		# delete from drive
 551		try:
 552			if self.get_results_path().exists():
 553				self.get_results_path().unlink()
 554			if self.get_results_path().with_suffix(".log").exists():
 555				self.get_results_path().with_suffix(".log").unlink()
 556			if self.get_results_folder_path().exists():
 557				shutil.rmtree(self.get_results_folder_path())
 558
 559		except FileNotFoundError:
 560			# already deleted, apparently
 561			pass
 562		except PermissionError as e:
 563			self.db.log.error(f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}")
 564
 565	def update_children(self, **kwargs):
 566		"""
 567		Update an attribute for all child datasets
 568
 569		Can be used to e.g. change the owner, version, finished status for all
 570		datasets in a tree
 571
 572		:param kwargs:  Parameters corresponding to known dataset attributes
 573		"""
 574		children = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,))
 575		for child in children:
 576			child = DataSet(key=child["key"], db=self.db, modules=self.modules)
 577			for attr, value in kwargs.items():
 578				child.__setattr__(attr, value)
 579
 580			child.update_children(**kwargs)
 581
 582	def is_finished(self):
 583		"""
 584		Check if dataset is finished
 585		:return bool:
 586		"""
 587		return self.data["is_finished"] == True
 588
 589	def is_rankable(self, multiple_items=True):
 590		"""
 591		Determine if a dataset is rankable
 592
 593		Rankable means that it is a CSV file with 'date' and 'value' columns
 594		as well as one or more item label columns
 595
 596		:param bool multiple_items:  Consider datasets with multiple items per
 597		item (e.g. word_1, word_2, etc)?
 598
 599		:return bool:  Whether the dataset is rankable or not
 600		"""
 601		if self.get_results_path().suffix != ".csv" or not self.get_results_path().exists():
 602			return False
 603
 604		column_options = {"date", "value", "item"}
 605		if multiple_items:
 606			column_options.add("word_1")
 607
 608		with self.get_results_path().open(encoding="utf-8") as infile:
 609			reader = csv.DictReader(infile)
 610			try:
 611				return len(set(reader.fieldnames) & column_options) >= 3
 612			except (TypeError, ValueError):
 613				return False
 614
 615	def is_accessible_by(self, username, role="owner"):
 616		"""
 617		Check if dataset has given user as owner
 618
 619		:param str|User username: Username to check for
 620		:return bool:
 621		"""
 622		if type(username) is not str:
 623			if hasattr(username, "get_id"):
 624				username = username.get_id()
 625			else:
 626				raise TypeError("User must be a str or User object")
 627
 628		# 'normal' owners
 629		if username in [owner for owner, meta in self.owners.items() if (role is None or meta["role"] == role)]:
 630			return True
 631
 632		# owners that are owner by being part of a tag
 633		if username in itertools.chain(*[tagged_owners for tag, tagged_owners in self.tagged_owners.items() if (role is None or self.owners[f"tag:{tag}"]["role"] == role)]):
 634			return True
 635
 636		return False
 637
 638	def get_owners_users(self, role="owner"):
 639		"""
 640		Get list of dataset owners
 641
 642		This returns a list of *users* that are considered owners. Tags are
 643		transparently replaced with the users with that tag.
 644
 645		:param str|None role:  Role to check for. If `None`, all owners are
 646		returned regardless of role.
 647
 648		:return set:  De-duplicated owner list
 649		"""
 650		# 'normal' owners
 651		owners = [owner for owner, meta in self.owners.items() if
 652				  (role is None or meta["role"] == role) and not owner.startswith("tag:")]
 653
 654		# owners that are owner by being part of a tag
 655		owners.extend(itertools.chain(*[tagged_owners for tag, tagged_owners in self.tagged_owners.items() if
 656									   role is None or self.owners[f"tag:{tag}"]["role"] == role]))
 657
 658		# de-duplicate before returning
 659		return set(owners)
 660
 661	def get_owners(self, role="owner"):
 662		"""
 663		Get list of dataset owners
 664
 665		This returns a list of all owners, and does not transparently resolve
 666		tags (like `get_owners_users` does).
 667
 668		:param str|None role:  Role to check for. If `None`, all owners are
 669		returned regardless of role.
 670
 671		:return set:  De-duplicated owner list
 672		"""
 673		return [owner for owner, meta in self.owners.items() if (role is None or meta["role"] == role)]
 674
 675	def add_owner(self, username, role="owner"):
 676		"""
 677		Set dataset owner
 678
 679		If the user is already an owner, but with a different role, the role is
 680		updated. If the user is already an owner with the same role, nothing happens.
 681
 682		:param str|User username:  Username to set as owner
 683		:param str|None role:  Role to add user with.
 684		"""
 685		if type(username) is not str:
 686			if hasattr(username, "get_id"):
 687				username = username.get_id()
 688			else:
 689				raise TypeError("User must be a str or User object")
 690
 691		if username not in self.owners:
 692			self.owners[username] = {
 693				"name": username,
 694				"key": self.key,
 695				"role": role
 696			}
 697			self.db.insert("datasets_owners", data=self.owners[username], safe=True)
 698
 699		elif username in self.owners and self.owners[username]["role"] != role:
 700			self.db.update("datasets_owners", data={"role": role}, where={"name": username, "key": self.key})
 701			self.owners[username]["role"] = role
 702
 703		if username.startswith("tag:"):
 704			# this is a bit more complicated than just adding to the list of
 705			# owners, so do a full refresh
 706			self.refresh_owners()
 707
 708		# make sure children's owners remain in sync
 709		for child in self.children:
 710			child.add_owner(username, role)
 711			# not recursive, since we're calling it from recursive code!
 712			child.copy_ownership_from(self, recursive=False)
 713
 714	def remove_owner(self, username):
 715		"""
 716		Remove dataset owner
 717
 718		If no owner is set, the dataset is assigned to the anonymous user.
 719		If the user is not an owner, nothing happens.
 720
 721		:param str|User username:  Username to set as owner
 722		"""
 723		if type(username) is not str:
 724			if hasattr(username, "get_id"):
 725				username = username.get_id()
 726			else:
 727				raise TypeError("User must be a str or User object")
 728
 729		if username in self.owners:
 730			del self.owners[username]
 731			self.db.delete("datasets_owners", where={"name": username, "key": self.key})
 732
 733			if not self.owners:
 734				self.add_owner("anonymous")
 735
 736		if username in self.tagged_owners:
 737			del self.tagged_owners[username]
 738
 739		# make sure children's owners remain in sync
 740		for child in self.children:
 741			child.remove_owner(username)
 742			# not recursive, since we're calling it from recursive code!
 743			child.copy_ownership_from(self, recursive=False)
 744
 745	def refresh_owners(self):
 746		"""
 747		Update internal owner cache
 748
 749		This makes sure that the list of *users* and *tags* which can access the
 750		dataset is up to date.
 751		"""
 752		self.owners = {owner["name"]: owner for owner in self.db.fetchall("SELECT * FROM datasets_owners WHERE key = %s", (self.key,))}
 753
 754		# determine which users (if any) are owners of the dataset by having a
 755		# tag that is listed as an owner
 756		owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")]
 757		if owner_tags:
 758			tagged_owners = self.db.fetchall("SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,))
 759			self.tagged_owners = {
 760				owner_tag: [user["name"] for user in tagged_owners if owner_tag in user["tags"]]
 761				for owner_tag in owner_tags
 762			}
 763		else:
 764			self.tagged_owners = {}
 765
 766	def copy_ownership_from(self, dataset, recursive=True):
 767		"""
 768		Copy ownership
 769
 770		This is useful to e.g. make sure a dataset's ownership stays in sync
 771		with its parent
 772
 773		:param Dataset dataset:  Parent to copy from
 774		:return:
 775		"""
 776		self.db.delete("datasets_owners", where={"key": self.key}, commit=False)
 777
 778		for role in ("owner", "viewer"):
 779			owners = dataset.get_owners(role=role)
 780			for owner in owners:
 781				self.db.insert("datasets_owners", data={"key": self.key, "name": owner, "role": role}, commit=False, safe=True)
 782
 783		self.db.commit()
 784		if recursive:
 785			for child in self.children:
 786				child.copy_ownership_from(self, recursive=recursive)
 787
 788	def get_parameters(self):
 789		"""
 790		Get dataset parameters
 791
 792		The dataset parameters are stored as JSON in the database - parse them
 793		and return the resulting object
 794
 795		:return:  Dataset parameters as originally stored
 796		"""
 797		try:
 798			return json.loads(self.data["parameters"])
 799		except json.JSONDecodeError:
 800			return {}
 801
 802	def get_columns(self):
 803		"""
 804		Returns the dataset columns.
 805
 806		Useful for processor input forms. Can deal with both CSV and NDJSON
 807		files, the latter only if a `map_item` function is available in the
 808		processor that generated it. While in other cases one could use the
 809		keys of the JSON object, this is not always possible in follow-up code
 810		that uses the 'column' names, so for consistency this function acts as
 811		if no column can be parsed if no `map_item` function exists.
 812
 813		:return list:  List of dataset columns; empty list if unable to parse
 814		"""
 815		if not self.get_results_path().exists():
 816			# no file to get columns from
 817			return []
 818
 819		if (self.get_results_path().suffix.lower() == ".csv") or (self.get_results_path().suffix.lower() == ".ndjson" and self.get_own_processor() is not None and self.get_own_processor().map_item_method_available(dataset=self)):
 820			items = self.iterate_items(warn_unmappable=False)
 821			try:
 822				keys = list(items.__next__().keys())
 823			except (StopIteration, NotImplementedError):
 824				# No items or otherwise unable to iterate
 825				return []
 826			finally:
 827				del items
 828			return keys
 829		else:
 830			# Filetype not CSV or an NDJSON with `map_item`
 831			return []
 832
 833	def get_annotation_fields(self):
 834		"""
 835		Retrieves the saved annotation fields for this dataset.
 836		:return dict: The saved annotation fields.
 837		"""
 838
 839		annotation_fields = self.db.fetchone("SELECT annotation_fields FROM datasets WHERE key = %s;", (self.top_parent().key,))
 840		
 841		if annotation_fields and annotation_fields.get("annotation_fields"):
 842			annotation_fields = json.loads(annotation_fields["annotation_fields"])
 843		else:
 844			annotation_fields = {}
 845
 846		return annotation_fields
 847
 848	def get_annotations(self):
 849		"""
 850		Retrieves the annotations for this dataset.
 851		return dict: The annotations
 852		"""
 853
 854		annotations = self.db.fetchone("SELECT annotations FROM annotations WHERE key = %s;", (self.top_parent().key,))
 855
 856		if annotations and annotations.get("annotations"):
 857			return json.loads(annotations["annotations"])
 858		else:
 859			return None
 860
 861	def update_label(self, label):
 862		"""
 863		Update label for this dataset
 864
 865		:param str label:  New label
 866		:return str:  The new label, as returned by get_label
 867		"""
 868		self.parameters["label"] = label
 869
 870		self.db.update("datasets", data={"parameters": json.dumps(self.parameters)}, where={"key": self.key})
 871		return self.get_label()
 872
 873	def get_label(self, parameters=None, default="Query"):
 874		"""
 875		Generate a readable label for the dataset
 876
 877		:param dict parameters:  Parameters of the dataset
 878		:param str default:  Label to use if it cannot be inferred from the
 879		parameters
 880
 881		:return str:  Label
 882		"""
 883		if not parameters:
 884			parameters = self.parameters
 885
 886		if parameters.get("label"):
 887			return parameters["label"]
 888		elif parameters.get("body_query") and parameters["body_query"] != "empty":
 889			return parameters["body_query"]
 890		elif parameters.get("body_match") and parameters["body_match"] != "empty":
 891			return parameters["body_match"]
 892		elif parameters.get("subject_query") and parameters["subject_query"] != "empty":
 893			return parameters["subject_query"]
 894		elif parameters.get("subject_match") and parameters["subject_match"] != "empty":
 895			return parameters["subject_match"]
 896		elif parameters.get("query"):
 897			label = parameters["query"]
 898			# Some legacy datasets have lists as query data
 899			if isinstance(label, list):
 900				label = ", ".join(label)
 901
 902			label = label if len(label) < 30 else label[:25] + "..."
 903			label = label.strip().replace("\n", ", ")
 904			return label
 905		elif parameters.get("country_flag") and parameters["country_flag"] != "all":
 906			return "Flag: %s" % parameters["country_flag"]
 907		elif parameters.get("country_name") and parameters["country_name"] != "all":
 908			return "Country: %s" % parameters["country_name"]
 909		elif parameters.get("filename"):
 910			return parameters["filename"]
 911		elif parameters.get("board") and "datasource" in parameters:
 912			return parameters["datasource"] + "/" + parameters["board"]
 913		elif "datasource" in parameters and parameters["datasource"] in self.modules.datasources:
 914			return self.modules.datasources[parameters["datasource"]]["name"] + " Dataset"
 915		else:
 916			return default
 917
 918	def change_datasource(self, datasource):
 919		"""
 920		Change the datasource type for this dataset
 921
 922		:param str label:  New datasource type
 923		:return str:  The new datasource type
 924		"""
 925
 926		self.parameters["datasource"] = datasource
 927
 928		self.db.update("datasets", data={"parameters": json.dumps(self.parameters)}, where={"key": self.key})
 929		return datasource
 930
 931	def reserve_result_file(self, parameters=None, extension="csv"):
 932		"""
 933		Generate a unique path to the results file for this dataset
 934
 935		This generates a file name for the data file of this dataset, and makes sure
 936		no file exists or will exist at that location other than the file we
 937		expect (i.e. the data for this particular dataset).
 938
 939		:param str extension: File extension, "csv" by default
 940		:param parameters:  Dataset parameters
 941		:return bool:  Whether the file path was successfully reserved
 942		"""
 943		if self.data["is_finished"]:
 944			raise RuntimeError("Cannot reserve results file for a finished dataset")
 945
 946		# Use 'random' for random post queries
 947		if "random_amount" in parameters and int(parameters["random_amount"]) > 0:
 948			file = 'random-' + str(parameters["random_amount"]) + '-' + self.data["key"]
 949		# Use country code for country flag queries
 950		elif "country_flag" in parameters and parameters["country_flag"] != 'all':
 951			file = 'countryflag-' + str(parameters["country_flag"]) + '-' + self.data["key"]
 952		# Use the query string for all other queries
 953		else:
 954			query_bit = self.data["query"].replace(" ", "-").lower()
 955			query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit)
 956			query_bit = query_bit[:100]  # Crop to avoid OSError
 957			file = query_bit + "-" + self.data["key"]
 958			file = re.sub(r"[-]+", "-", file)
 959
 960		path = self.folder.joinpath(file + "." + extension.lower())
 961		index = 1
 962		while path.is_file():
 963			path = self.folder.joinpath(file + "-" + str(index) + "." + extension.lower())
 964			index += 1
 965
 966		file = path.name
 967		updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]},
 968								 data={"result_file": file})
 969		self.data["result_file"] = file
 970		return updated > 0
 971
 972	def get_key(self, query, parameters, parent="", time_offset=0):
 973		"""
 974		Generate a unique key for this dataset that can be used to identify it
 975
 976		The key is a hash of a combination of the query string and parameters.
 977		You never need to call this, really: it's used internally.
 978
 979		:param str query:  Query string
 980		:param parameters:  Dataset parameters
 981		:param parent: Parent dataset's key (if applicable)
 982		:param time_offset:  Offset to add to the time component of the dataset
 983		key. This can be used to ensure a unique key even if the parameters and
 984		timing is otherwise identical to an existing dataset's
 985
 986		:return str:  Dataset key
 987		"""
 988		# Return a hash based on parameters
 989		# we're going to use the hash of the parameters to uniquely identify
 990		# the dataset, so make sure it's always in the same order, or we might
 991		# end up creating multiple keys for the same dataset if python
 992		# decides to return the dict in a different order
 993		param_key = collections.OrderedDict()
 994		for key in sorted(parameters):
 995			param_key[key] = parameters[key]
 996
 997		# we additionally use the current time as a salt - this should usually
 998		# ensure a unique key for the dataset. if for some reason there is a
 999		# hash collision
1000		param_key["_salt"] = int(time.time()) + time_offset
1001
1002		parent_key = str(parent) if parent else ""
1003		plain_key = repr(param_key) + str(query) + parent_key
1004		hashed_key = hashlib.md5(plain_key.encode("utf-8")).hexdigest()
1005
1006		if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)):
1007			# key exists, generate a new one
1008			return self.get_key(query, parameters, parent, time_offset=random.randint(1,10))
1009		else:
1010			return hashed_key
1011
1012	def set_key(self, key):
1013		"""
1014		Change dataset key
1015
1016		In principe, keys should never be changed. But there are rare cases
1017		where it is useful to do so, in particular when importing a dataset
1018		from another 4CAT instance; in that case it makes sense to try and
1019		ensure that the key is the same as it was before. This function sets
1020		the dataset key and updates any dataset references to it.
1021
1022		:param str key:  Key to set
1023		:return str:  Key that was set. If the desired key already exists, the
1024		original key is kept.
1025		"""
1026		key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,))
1027		if key_exists or not key:
1028			return self.key
1029
1030		old_key = self.key
1031		self.db.update("datasets", data={"key": key}, where={"key": old_key})
1032
1033		# update references
1034		self.db.update("datasets", data={"key_parent": key}, where={"key_parent": old_key})
1035		self.db.update("datasets_owners", data={"key": key}, where={"key": old_key})
1036		self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key})
1037		self.db.update("users_favourites", data={"key": key}, where={"key": old_key})
1038
1039		# for good measure
1040		self.db.commit()
1041		self.key = key
1042
1043		return self.key
1044
1045	def get_status(self):
1046		"""
1047		Get Dataset status
1048
1049		:return string: Dataset status
1050		"""
1051		return self.data["status"]
1052
1053	def update_status(self, status, is_final=False):
1054		"""
1055		Update dataset status
1056
1057		The status is a string that may be displayed to a user to keep them
1058		updated and informed about the progress of a dataset. No memory is kept
1059		of earlier dataset statuses; the current status is overwritten when
1060		updated.
1061
1062		Statuses are also written to the dataset log file.
1063
1064		:param string status:  Dataset status
1065		:param bool is_final:  If this is `True`, subsequent calls to this
1066		method while the object is instantiated will not update the dataset
1067		status.
1068		:return bool:  Status update successful?
1069		"""
1070		if self.no_status_updates:
1071			return
1072
1073		# for presets, copy the updated status to the preset(s) this is part of
1074		if self.preset_parent is None:
1075			self.preset_parent = [parent for parent in self.get_genealogy() if parent.type.find("preset-") == 0 and parent.key != self.key][:1]
1076
1077		if self.preset_parent:
1078			for preset_parent in self.preset_parent:
1079				if not preset_parent.is_finished():
1080					preset_parent.update_status(status)
1081
1082		self.data["status"] = status
1083		updated = self.db.update("datasets", where={"key": self.data["key"]}, data={"status": status})
1084
1085		if is_final:
1086			self.no_status_updates = True
1087
1088		self.log(status)
1089
1090		return updated > 0
1091
1092	def update_progress(self, progress):
1093		"""
1094		Update dataset progress
1095
1096		The progress can be used to indicate to a user how close the dataset
1097		is to completion.
1098
1099		:param float progress:  Between 0 and 1.
1100		:return:
1101		"""
1102		progress = min(1, max(0, progress))  # clamp
1103		if type(progress) is int:
1104			progress = float(progress)
1105
1106		self.data["progress"] = progress
1107		updated = self.db.update("datasets", where={"key": self.data["key"]}, data={"progress": progress})
1108		return updated > 0
1109
1110	def get_progress(self):
1111		"""
1112		Get dataset progress
1113
1114		:return float:  Progress, between 0 and 1
1115		"""
1116		return self.data["progress"]
1117
1118	def finish_with_error(self, error):
1119		"""
1120		Set error as final status, and finish with 0 results
1121
1122		This is a convenience function to avoid having to repeat
1123		"update_status" and "finish" a lot.
1124
1125		:param str error:  Error message for final dataset status.
1126		:return:
1127		"""
1128		self.update_status(error, is_final=True)
1129		self.finish(0)
1130
1131		return None
1132
1133	def update_version(self, version):
1134		"""
1135		Update software version used for this dataset
1136
1137		This can be used to verify the code that was used to process this dataset.
1138
1139		:param string version:  Version identifier
1140		:return bool:  Update successul?
1141		"""
1142		try:
1143			# this fails if the processor type is unknown
1144			# edge case, but let's not crash...
1145			processor_path = self.modules.processors.get(self.data["type"]).filepath
1146		except AttributeError:
1147			processor_path = ""
1148
1149		updated = self.db.update("datasets", where={"key": self.data["key"]}, data={
1150			"software_version": version[0],
1151			"software_source": version[1],
1152			"software_file": processor_path
1153		})
1154
1155		return updated > 0
1156
1157	def delete_parameter(self, parameter, instant=True):
1158		"""
1159		Delete a parameter from the dataset metadata
1160
1161		:param string parameter:  Parameter to delete
1162		:param bool instant:  Also delete parameters in this instance object?
1163		:return bool:  Update successul?
1164		"""
1165		parameters = self.parameters.copy()
1166		if parameter in parameters:
1167			del parameters[parameter]
1168		else:
1169			return False
1170
1171		updated = self.db.update("datasets", where={"key": self.data["key"]},
1172								 data={"parameters": json.dumps(parameters)})
1173
1174		if instant:
1175			self.parameters = parameters
1176
1177		return updated > 0
1178
1179	def get_version_url(self, file):
1180		"""
1181		Get a versioned github URL for the version this dataset was processed with
1182
1183		:param file:  File to link within the repository
1184		:return:  URL, or an empty string
1185		"""
1186		if not self.data["software_source"]:
1187			return ""
1188
1189		filepath = self.data.get("software_file", "")
1190		if filepath.startswith("/extensions/"):
1191			# go to root of extension
1192			filepath = "/" + "/".join(filepath.split("/")[3:])
1193
1194		return self.data["software_source"] + "/blob/" + self.data["software_version"] + filepath
1195
1196	def top_parent(self):
1197		"""
1198		Get root dataset
1199
1200		Traverses the tree of datasets this one is part of until it finds one
1201		with no source_dataset dataset, then returns that dataset.
1202
1203		:return Dataset: Parent dataset
1204		"""
1205		genealogy = self.get_genealogy()
1206		return genealogy[0]
1207
1208	def get_genealogy(self, inclusive=False):
1209		"""
1210		Get genealogy of this dataset
1211
1212		Creates a list of DataSet objects, with the first one being the
1213		'top' dataset, and each subsequent one being a child of the previous
1214		one, ending with the current dataset.
1215
1216		:return list:  Dataset genealogy, oldest dataset first
1217		"""
1218		if self.genealogy and not inclusive:
1219			return self.genealogy
1220
1221		key_parent = self.key_parent
1222		genealogy = []
1223
1224		while key_parent:
1225			try:
1226				parent = DataSet(key=key_parent, db=self.db, modules=self.modules)
1227			except DataSetException:
1228				break
1229
1230			genealogy.append(parent)
1231			if parent.key_parent:
1232				key_parent = parent.key_parent
1233			else:
1234				break
1235
1236		genealogy.reverse()
1237		genealogy.append(self)
1238
1239		self.genealogy = genealogy
1240		return self.genealogy
1241
1242	def get_all_children(self, recursive=True):
1243		"""
1244		Get all children of this dataset
1245
1246		Results are returned as a non-hierarchical list, i.e. the result does
1247		not reflect the actual dataset hierarchy (but all datasets in the
1248		result will have the original dataset as an ancestor somewhere)
1249
1250		:return list:  List of DataSets
1251		"""
1252		children = [DataSet(data=record, db=self.db, modules=self.modules) for record in self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,))]
1253		results = children.copy()
1254		if recursive:
1255			for child in children:
1256				results += child.get_all_children(recursive)
1257
1258		return results
1259
1260	def nearest(self, type_filter):
1261		"""
1262		Return nearest dataset that matches the given type
1263
1264		Starting with this dataset, traverse the hierarchy upwards and return
1265		whichever dataset matches the given type.
1266
1267		:param str type_filter:  Type filter. Can contain wildcards and is matched
1268		using `fnmatch.fnmatch`.
1269		:return:  Earliest matching dataset, or `None` if none match.
1270		"""
1271		genealogy = self.get_genealogy(inclusive=True)
1272		for dataset in reversed(genealogy):
1273			if fnmatch.fnmatch(dataset.type, type_filter):
1274				return dataset
1275
1276		return None
1277
1278	def get_breadcrumbs(self):
1279		"""
1280		Get breadcrumbs navlink for use in permalinks
1281
1282		Returns a string representing this dataset's genealogy that may be used
1283		to uniquely identify it.
1284
1285		:return str: Nav link
1286		"""
1287		if self.genealogy:
1288			return ",".join([dataset.key for dataset in self.genealogy])
1289		else:
1290			# Collect keys only
1291			key_parent = self.key  # Start at the bottom
1292			genealogy = []
1293
1294			while key_parent:
1295				try:
1296					parent = self.db.fetchone("SELECT key_parent FROM datasets WHERE key = %s", (key_parent,))
1297				except TypeError:
1298					break
1299
1300				key_parent = parent["key_parent"]
1301				if key_parent:
1302					genealogy.append(key_parent)
1303				else:
1304					break
1305
1306			genealogy.reverse()
1307			genealogy.append(self.key)
1308			return ",".join(genealogy)
1309
1310	def get_compatible_processors(self, user=None):
1311		"""
1312		Get list of processors compatible with this dataset
1313
1314		Checks whether this dataset type is one that is listed as being accepted
1315		by the processor, for each known type: if the processor does not
1316		specify accepted types (via the `is_compatible_with` method), it is
1317		assumed it accepts any top-level datasets
1318
1319		:param str|User|None user:  User to get compatibility for. If set,
1320		use the user-specific config settings where available.
1321
1322		:return dict:  Compatible processors, `name => class` mapping
1323		"""
1324		processors = self.modules.processors
1325
1326		available = {}
1327		for processor_type, processor in processors.items():
1328			if processor.is_from_collector():
1329				continue
1330
1331			own_processor = self.get_own_processor()
1332			if own_processor and own_processor.exclude_followup_processors(processor_type):
1333				continue
1334
1335			# consider a processor compatible if its is_compatible_with
1336			# method returns True *or* if it has no explicit compatibility
1337			# check and this dataset is top-level (i.e. has no parent)
1338			if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \
1339					or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, user=user)):
1340				available[processor_type] = processor
1341
1342		return available
1343
1344	def get_place_in_queue(self, update=False):
1345		"""
1346		Determine dataset's position in queue
1347
1348		If the dataset is already finished, the position is -1. Else, the
1349		position is the amount of datasets to be completed before this one will
1350		be processed. A position of 0 would mean that the dataset is currently
1351		being executed, or that the backend is not running.
1352
1353		:param bool update:  Update the queue position from database if True, else return cached value
1354		:return int:  Queue position
1355		"""
1356		if self.is_finished() or not self.data.get("job"):
1357			self._queue_position = -1
1358			return self._queue_position
1359		elif not update and self._queue_position is not None:
1360			# Use cached value
1361			return self._queue_position
1362		else:
1363			# Collect queue position from database via the job
1364			try:
1365				job = Job.get_by_ID(self.data["job"], self.db)
1366				self._queue_position = job.get_place_in_queue()
1367			except JobNotFoundException:
1368				self._queue_position = -1
1369
1370			return self._queue_position
1371
1372	def get_modules(self):
1373		"""
1374		Get 4CAT modules
1375
1376		Is a function because loading them is not free, and this way we can
1377		cache the result.
1378
1379		:return:
1380		"""
1381		if not self.modules:
1382			self.modules = ModuleCollector()
1383
1384		return self.modules
1385
1386	def get_own_processor(self):
1387		"""
1388		Get the processor class that produced this dataset
1389
1390		:return:  Processor class, or `None` if not available.
1391		"""
1392		processor_type = self.parameters.get("type", self.data.get("type"))
1393
1394		return self.modules.processors.get(processor_type)
1395
1396	def get_available_processors(self, user=None, exclude_hidden=False):
1397		"""
1398		Get list of processors that may be run for this dataset
1399
1400		Returns all compatible processors except for those that are already
1401		queued or finished and have no options. Processors that have been
1402		run but have options are included so they may be run again with a
1403		different configuration
1404
1405		:param str|User|None user:  User to get compatibility for. If set,
1406		use the user-specific config settings where available.
1407		:param bool exclude_hidden:  Exclude processors that should be displayed
1408		in the UI? If `False`, all processors are returned.
1409
1410		:return dict:  Available processors, `name => properties` mapping
1411		"""
1412		if self.available_processors:
1413			# Update to reflect exclude_hidden parameter which may be different from last call
1414			# TODO: could children also have been created? Possible bug, but I have not seen anything effected by this
1415			return {processor_type: processor for processor_type, processor in self.available_processors.items() if not exclude_hidden or not processor.is_hidden}
1416
1417		processors = self.get_compatible_processors(user=user)
1418
1419		for analysis in self.children:
1420			if analysis.type not in processors:
1421				continue
1422
1423			if not processors[analysis.type].get_options():
1424				del processors[analysis.type]
1425				continue
1426
1427			if exclude_hidden and processors[analysis.type].is_hidden:
1428				del processors[analysis.type]
1429
1430		self.available_processors = processors
1431		return processors
1432
1433	def link_job(self, job):
1434		"""
1435		Link this dataset to a job ID
1436
1437		Updates the dataset data to include a reference to the job that will be
1438		executing (or has already executed) this job.
1439
1440		Note that if no job can be found for this dataset, this method silently
1441		fails.
1442
1443		:param Job job:  The job that will run this dataset
1444
1445		:todo: If the job column ever gets used, make sure it always contains
1446		       a valid value, rather than silently failing this method.
1447		"""
1448		if type(job) != Job:
1449			raise TypeError("link_job requires a Job object as its argument")
1450
1451		if "id" not in job.data:
1452			try:
1453				job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"])
1454			except JobNotFoundException:
1455				return
1456
1457		self.db.update("datasets", where={"key": self.key}, data={"job": job.data["id"]})
1458
1459	def link_parent(self, key_parent):
1460		"""
1461		Set source_dataset key for this dataset
1462
1463		:param key_parent:  Parent key. Not checked for validity
1464		"""
1465		self.db.update("datasets", where={"key": self.key}, data={"key_parent": key_parent})
1466
1467	def get_parent(self):
1468		"""
1469		Get parent dataset
1470
1471		:return DataSet:  Parent dataset, or `None` if not applicable
1472		"""
1473		return DataSet(key=self.key_parent, db=self.db, modules=self.modules) if self.key_parent else None
1474
1475	def detach(self):
1476		"""
1477		Makes the datasets standalone, i.e. not having any source_dataset dataset
1478		"""
1479		self.link_parent("")
1480
1481	def is_dataset(self):
1482		"""
1483		Easy way to confirm this is a dataset.
1484		Used for checking processor and dataset compatibility,
1485		which needs to handle both processors and datasets.
1486		"""
1487		return True
1488
1489	def is_top_dataset(self):
1490		"""
1491		Easy way to confirm this is a top dataset.
1492		Used for checking processor and dataset compatibility,
1493		which needs to handle both processors and datasets.
1494		"""
1495		if self.key_parent:
1496			return False
1497		return True
1498
1499	def is_expiring(self, user=None):
1500		"""
1501		Determine if dataset is set to expire
1502
1503		Similar to `is_expired`, but checks if the dataset will be deleted in
1504		the future, not if it should be deleted right now.
1505
1506		:param user:  User to use for configuration context. Provide to make
1507		sure configuration overrides for this user are taken into account.
1508		:return bool|int:  `False`, or the expiration date as a Unix timestamp.
1509		"""
1510		# has someone opted out of deleting this?
1511		if self.parameters.get("keep"):
1512			return False
1513
1514		# is this dataset explicitly marked as expiring after a certain time?
1515		if self.parameters.get("expires-after"):
1516			return self.parameters.get("expires-after")
1517
1518		# is the data source configured to have its datasets expire?
1519		expiration = config.get("datasources.expiration", {}, user=user)
1520		if not expiration.get(self.parameters.get("datasource")):
1521			return False
1522
1523		# is there a timeout for this data source?
1524		if expiration.get(self.parameters.get("datasource")).get("timeout"):
1525			return self.timestamp + expiration.get(self.parameters.get("datasource")).get("timeout")
1526
1527		return False
1528
1529	def is_expired(self, user=None):
1530		"""
1531		Determine if dataset should be deleted
1532
1533		Datasets can be set to expire, but when they should be deleted depends
1534		on a number of factor. This checks them all.
1535
1536		:param user:  User to use for configuration context. Provide to make
1537		sure configuration overrides for this user are taken into account.
1538		:return bool:
1539		"""
1540		# has someone opted out of deleting this?
1541		if not self.is_expiring():
1542			return False
1543
1544		# is this dataset explicitly marked as expiring after a certain time?
1545		future = time.time() + 3600  # ensure we don't delete datasets with invalid expiration times
1546		if self.parameters.get("expires-after") and convert_to_int(self.parameters["expires-after"], future) < time.time():
1547			return True
1548
1549		# is the data source configured to have its datasets expire?
1550		expiration = config.get("datasources.expiration", {}, user=user)
1551		if not expiration.get(self.parameters.get("datasource")):
1552			return False
1553
1554		# is the dataset older than the set timeout?
1555		if expiration.get(self.parameters.get("datasource")).get("timeout"):
1556			return self.timestamp + expiration[self.parameters.get("datasource")]["timeout"] < time.time()
1557
1558		return False
1559
1560	def is_from_collector(self):
1561		"""
1562		Check if this dataset was made by a processor that collects data, i.e.
1563		a search or import worker.
1564
1565		:return bool:
1566		"""
1567		return self.type.endswith("-search") or self.type.endswith("-import")
1568
1569	def get_extension(self):
1570		"""
1571		Gets the file extention this dataset produces.
1572		Also checks whether the results file exists.
1573		Used for checking processor and dataset compatibility.
1574
1575		:return str extension:  Extension, e.g. `csv`
1576		"""
1577		if self.get_results_path().exists():
1578			return self.get_results_path().suffix[1:]
1579
1580		return False
1581
1582	def get_media_type(self):
1583		"""
1584		Gets the media type of the dataset file.
1585
1586		:return str: media type, e.g., "text"
1587		"""
1588		own_processor = self.get_own_processor()
1589		if hasattr(self, "media_type"):
1590			# media type can be defined explicitly in the dataset; this is the priority
1591			return self.media_type
1592		elif own_processor is not None:
1593			# or media type can be defined in the processor
1594			# some processors can set different media types for different datasets (e.g., import_media)
1595			if hasattr(own_processor, "media_type"):
1596				return own_processor.media_type
1597
1598		# Default to text
1599		return self.parameters.get("media_type", "text")
1600
1601	def get_metadata(self):
1602		"""
1603		Get dataset metadata
1604
1605		This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended
1606		as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to
1607		update its database (and ensure compatibility with the exporting version of 4CAT).
1608		"""
1609		metadata = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (self.key,))
1610
1611		# get 4CAT version (presumably to ensure export is compatible with import)
1612		metadata["current_4CAT_version"] = get_software_version()
1613		return metadata
1614
1615	def get_result_url(self):
1616		"""
1617		Gets the 4CAT frontend URL of a dataset file.
1618
1619		Uses the FlaskConfig attributes (i.e., SERVER_NAME and
1620		SERVER_HTTPS) plus hardcoded '/result/'.
1621		TODO: create more dynamic method of obtaining url.
1622		"""
1623		filename = self.get_results_path().name
1624		url_to_file = ('https://' if config.get("flask.https") else 'http://') + \
1625						config.get("flask.server_name") + '/result/' + filename
1626		return url_to_file
1627
1628	def warn_unmappable_item(self, item_count, processor=None, error_message=None, warn_admins=True):
1629		"""
1630		Log an item that is unable to be mapped and warn administrators.
1631
1632		:param int item_count:			Item index
1633		:param Processor processor:		Processor calling function8
1634		"""
1635		dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}"
1636
1637		# Use processing dataset if available, otherwise use original dataset (which likely already has this error message)
1638		closest_dataset = processor.dataset if processor is not None and processor.dataset is not None else self
1639		# Log error to dataset log
1640		closest_dataset.log(dataset_error_message)
1641
1642		if warn_admins:
1643			if processor is not None:
1644				processor.log.warning(f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}.")
1645			elif hasattr(self.db, "log"):
1646				# borrow the database's log handler
1647				self.db.log.warning(f"Unable to map item all items for dataset {closest_dataset.key}.")
1648			else:
1649				# No other log available
1650				raise DataSetException(f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn")
1651
1652	def __getattr__(self, attr):
1653		"""
1654		Getter so we don't have to use .data all the time
1655
1656		:param attr:  Data key to get
1657		:return:  Value
1658		"""
1659
1660		if attr in dir(self):
1661			# an explicitly defined attribute should always be called in favour
1662			# of this passthrough
1663			attribute = getattr(self, attr)
1664			return attribute
1665		elif attr in self.data:
1666			return self.data[attr]
1667		else:
1668			raise AttributeError("DataSet instance has no attribute %s" % attr)
1669
1670	def __setattr__(self, attr, value):
1671		"""
1672		Setter so we can flexibly update the database
1673
1674		Also updates internal data stores (.data etc). If the attribute is
1675		unknown, it is stored within the 'parameters' attribute.
1676
1677		:param str attr:  Attribute to update
1678		:param value:  New value
1679		"""
1680
1681		# don't override behaviour for *actual* class attributes
1682		if attr in dir(self):
1683			super().__setattr__(attr, value)
1684			return
1685
1686		if attr not in self.data:
1687			self.parameters[attr] = value
1688			attr = "parameters"
1689			value = self.parameters
1690
1691		if attr == "parameters":
1692			value = json.dumps(value)
1693
1694		self.db.update("datasets", where={"key": self.key}, data={attr: value})
1695
1696		self.data[attr] = value
1697
1698		if attr == "parameters":
1699			self.parameters = json.loads(value)

Provide interface to safely register and run operations on a dataset

A dataset is a collection of:

  • A unique identifier
  • A set of parameters that demarcate the data contained within
  • The data

The data is usually stored in a file on the disk; the parameters are stored in a database. The handling of the data, et cetera, is done by other workers; this class defines method to create and manipulate the dataset's properties.

DataSet( parameters=None, key=None, job=None, data=None, db=None, parent='', extension=None, type=None, is_private=True, owner='anonymous', modules=None)
 62	def __init__(self, parameters=None, key=None, job=None, data=None, db=None, parent='', extension=None,
 63				 type=None, is_private=True, owner="anonymous", modules=None):
 64		"""
 65		Create new dataset object
 66
 67		If the dataset is not in the database yet, it is added.
 68
 69		:param dict parameters:  Only when creating a new dataset. Dataset
 70		parameters, free-form dictionary.
 71		:param str key: Dataset key. If given, dataset with this key is loaded.
 72		:param int job: Job ID. If given, dataset corresponding to job is
 73		loaded.
 74		:param dict data: Dataset data, corresponding to a row in the datasets
 75		database table. If not given, retrieved from database depending on key.
 76		:param db:  Database connection
 77		:param str parent:  Only when creating a new dataset. Parent dataset
 78		key to which the one being created is a child.
 79		:param str extension: Only when creating a new dataset. Extension of
 80		dataset result file.
 81		:param str type: Only when creating a new dataset. Type of the dataset,
 82		corresponding to the type property of a processor class.
 83		:param bool is_private: Only when creating a new dataset. Whether the
 84		dataset is private or public.
 85		:param str owner: Only when creating a new dataset. The user name of
 86		the dataset's creator.
 87		:param modules: Module cache. If not given, will be loaded when needed
 88		(expensive). Used to figure out what processors are compatible with
 89		this dataset.
 90		"""
 91		self.db = db
 92		self.folder = config.get('PATH_ROOT').joinpath(config.get('PATH_DATA'))
 93		# Ensure mutable attributes are set in __init__ as they are unique to each DataSet
 94		self.data = {}
 95		self.parameters = {}
 96		self.children = []
 97		self.available_processors = {}
 98		self.genealogy = []
 99		self.staging_areas = []
100		self.modules = modules
101
102		if key is not None:
103			self.key = key
104			current = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (self.key,))
105			if not current:
106				raise DataSetNotFoundException("DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given" % key)
107
108		elif job is not None:
109			current = self.db.fetchone("SELECT * FROM datasets WHERE parameters::json->>'job' = %s", (job,))
110			if not current:
111				raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument")
112
113			self.key = current["key"]
114		elif data is not None:
115			current = data
116			if "query" not in data or "key" not in data or "parameters" not in data or "key_parent" not in data:
117				raise DataSetException("DataSet() requires a complete dataset record for its 'data' argument")
118
119			self.key = current["key"]
120		else:
121			if parameters is None:
122				raise DataSetException("DataSet() requires either 'key', or 'parameters' to be given")
123
124			if not type:
125				raise DataSetException("Datasets must have their type set explicitly")
126
127			query = self.get_label(parameters, default=type)
128			self.key = self.get_key(query, parameters, parent)
129			current = self.db.fetchone("SELECT * FROM datasets WHERE key = %s AND query = %s", (self.key, query))
130
131		if current:
132			self.data = current
133			self.parameters = json.loads(self.data["parameters"])
134			self.is_new = False
135		else:
136			self.data = {"type": type}  # get_own_processor needs this
137			own_processor = self.get_own_processor()
138			version = get_software_commit(own_processor)
139			self.data = {
140				"key": self.key,
141				"query": self.get_label(parameters, default=type),
142				"parameters": json.dumps(parameters),
143				"result_file": "",
144				"creator": owner,
145				"status": "",
146				"type": type,
147				"timestamp": int(time.time()),
148				"is_finished": False,
149				"is_private": is_private,
150				"software_version": version[0],
151				"software_source": version[1],
152				"software_file": "",
153				"num_rows": 0,
154				"progress": 0.0,
155				"key_parent": parent
156			}
157			self.parameters = parameters
158
159			self.db.insert("datasets", data=self.data)
160			self.refresh_owners()
161			self.add_owner(owner)
162
163			# Find desired extension from processor if not explicitly set
164			if extension is None:
165				if own_processor:
166					extension = own_processor.get_extension(parent_dataset=DataSet(key=parent, db=db, modules=self.modules) if parent else None)
167				# Still no extension, default to 'csv'
168				if not extension:
169					extension = "csv"
170
171			# Reserve filename and update data['result_file']
172			self.reserve_result_file(parameters, extension)
173
174		# retrieve analyses and processors that may be run for this dataset
175		analyses = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC", (self.key,))
176		self.children = sorted([DataSet(data=analysis, db=self.db, modules=self.modules) for analysis in analyses],
177							   key=lambda dataset: dataset.is_finished(), reverse=True)
178
179		self.refresh_owners()

Create new dataset object

If the dataset is not in the database yet, it is added.

Parameters
  • dict parameters: Only when creating a new dataset. Dataset parameters, free-form dictionary.
  • str key: Dataset key. If given, dataset with this key is loaded.
  • int job: Job ID. If given, dataset corresponding to job is loaded.
  • dict data: Dataset data, corresponding to a row in the datasets database table. If not given, retrieved from database depending on key.
  • db: Database connection
  • str parent: Only when creating a new dataset. Parent dataset key to which the one being created is a child.
  • str extension: Only when creating a new dataset. Extension of dataset result file.
  • str type: Only when creating a new dataset. Type of the dataset, corresponding to the type property of a processor class.
  • bool is_private: Only when creating a new dataset. Whether the dataset is private or public.
  • str owner: Only when creating a new dataset. The user name of the dataset's creator.
  • modules: Module cache. If not given, will be loaded when needed (expensive). Used to figure out what processors are compatible with this dataset.
data = None
key = ''
children = None
available_processors = None
genealogy = None
preset_parent = None
parameters = None
modules = None
owners = None
tagged_owners = None
db = None
folder = None
is_new = True
no_status_updates = False
staging_areas = None
def check_dataset_finished(self):
181	def check_dataset_finished(self):
182		"""
183		Checks if dataset is finished. Returns path to results file is not empty,
184		or 'empty_file' when there were not matches.
185
186		Only returns a path if the dataset is complete. In other words, if this
187		method returns a path, a file with the complete results for this dataset
188		will exist at that location.
189
190		:return: A path to the results file, 'empty_file', or `None`
191		"""
192		if self.data["is_finished"] and self.data["num_rows"] > 0:
193			return self.folder.joinpath(self.data["result_file"])
194		elif self.data["is_finished"] and self.data["num_rows"] == 0:
195			return 'empty'
196		else:
197			return None

Checks if dataset is finished. Returns path to results file is not empty, or 'empty_file' when there were not matches.

Only returns a path if the dataset is complete. In other words, if this method returns a path, a file with the complete results for this dataset will exist at that location.

Returns

A path to the results file, 'empty_file', or None

def get_results_path(self):
199	def get_results_path(self):
200		"""
201		Get path to results file
202
203		Always returns a path, that will at some point contain the dataset
204		data, but may not do so yet. Use this to get the location to write
205		generated results to.
206
207		:return Path:  A path to the results file
208		"""
209		return self.folder.joinpath(self.data["result_file"])

Get path to results file

Always returns a path, that will at some point contain the dataset data, but may not do so yet. Use this to get the location to write generated results to.

Returns

A path to the results file

def get_results_folder_path(self):
211	def get_results_folder_path(self):
212		"""
213		Get path to folder containing accompanying results
214
215		Returns a path that may not yet be created
216
217		:return Path:  A path to the results file
218		"""
219		return self.folder.joinpath("folder_" + self.key)

Get path to folder containing accompanying results

Returns a path that may not yet be created

Returns

A path to the results file

def get_log_path(self):
221	def get_log_path(self):
222		"""
223		Get path to dataset log file
224
225		Each dataset has a single log file that documents its creation. This
226		method returns the path to that file. It is identical to the path of
227		the dataset result file, with 'log' as its extension instead.
228
229		:return Path:  A path to the log file
230		"""
231		return self.get_results_path().with_suffix(".log")

Get path to dataset log file

Each dataset has a single log file that documents its creation. This method returns the path to that file. It is identical to the path of the dataset result file, with 'log' as its extension instead.

Returns

A path to the log file

def clear_log(self):
233	def clear_log(self):
234		"""
235		Clears the dataset log file
236
237		If the log file does not exist, it is created empty. The log file will
238		have the same file name as the dataset result file, with the 'log'
239		extension.
240		"""
241		log_path = self.get_log_path()
242		with log_path.open("w") as outfile:
243			pass

Clears the dataset log file

If the log file does not exist, it is created empty. The log file will have the same file name as the dataset result file, with the 'log' extension.

def log(self, log):
245	def log(self, log):
246		"""
247		Write log message to file
248
249		Writes the log message to the log file on a new line, including a
250		timestamp at the start of the line. Note that this assumes the log file
251		already exists - it should have been created/cleared with clear_log()
252		prior to calling this.
253
254		:param str log:  Log message to write
255		"""
256		log_path = self.get_log_path()
257		with log_path.open("a", encoding="utf-8") as outfile:
258			outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log))

Write log message to file

Writes the log message to the log file on a new line, including a timestamp at the start of the line. Note that this assumes the log file already exists - it should have been created/cleared with clear_log() prior to calling this.

Parameters
  • str log: Log message to write
def iterate_items(self, processor=None, warn_unmappable=True, map_missing='default'):
314	def iterate_items(self, processor=None, warn_unmappable=True, map_missing="default"):
315		"""
316		Generate mapped dataset items
317
318		Wrapper for _iterate_items that returns a DatasetItem, which can be
319		accessed as a dict returning the original item or (if a mapper is
320		available) the mapped item. Mapped or original versions of the item can
321		also be accessed via the `original` and `mapped_object` properties of
322		the DatasetItem.
323
324		Processors can define a method called `map_item` that can be used to map
325		an item from the dataset file before it is processed any further. This is
326		slower than storing the data file in the right format to begin with but
327		not all data sources allow for easy 'flat' mapping of items, e.g. tweets
328		are nested objects when retrieved from the twitter API that are easier
329		to store as a JSON file than as a flat CSV file, and it would be a shame
330		to throw away that data.
331
332		Note the two parameters warn_unmappable and map_missing. Items can be
333		unmappable in that their structure is too different to coerce into a
334		neat dictionary of the structure the data source expects. This makes it
335		'unmappable' and warn_unmappable determines what happens in this case.
336		It can also be of the right structure, but with some fields missing or
337		incomplete. map_missing determines what happens in that case. The
338		latter is for example possible when importing data via Zeeschuimer,
339		which produces unstably-structured data captured from social media
340		sites.
341
342		:param BasicProcessor processor:  A reference to the processor
343		iterating the dataset.
344		:param bool warn_unmappable:  If an item is not mappable, skip the item
345		and log a warning
346		:param map_missing: Indicates what to do with mapped items for which
347		some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
348		- 'default': fill missing fields with the default passed by map_item
349		- 'abort': raise a MappedItemIncompleteException if a field is missing
350		- a callback: replace missing field with the return value of the
351		  callback. The MappedItem object is passed to the callback as the
352		  first argument and the name of the missing field as the second.
353		- a dictionary with a key for each possible missing field: replace missing
354		  field with a strategy for that field ('default', 'abort', or a callback)
355
356		:return generator:  A generator that yields DatasetItems
357		"""
358		unmapped_items = False
359		# Collect item_mapper for use with filter
360		item_mapper = False
361		own_processor = self.get_own_processor()
362		if own_processor and own_processor.map_item_method_available(dataset=self):
363			item_mapper = True
364
365		# missing field strategy can be for all fields at once, or per field
366		# if it is per field, it is a dictionary with field names and their strategy
367		# if it is for all fields, it is may be a callback, 'abort', or 'default'
368		default_strategy = "default"
369		if type(map_missing) is not dict:
370			default_strategy = map_missing
371			map_missing = {}
372
373		# Loop through items
374		for i, item in enumerate(self._iterate_items(processor)):
375			# Save original to yield
376			original_item = item.copy()
377
378			# Map item
379			if item_mapper:
380				try:
381					mapped_item = own_processor.get_mapped_item(item)
382				except MapItemException as e:
383					if warn_unmappable:
384						self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False)
385						unmapped_items = True
386					continue
387
388				# check if fields have been marked as 'missing' in the
389				# underlying data, and treat according to the chosen strategy
390				if mapped_item.get_missing_fields():
391					for missing_field in mapped_item.get_missing_fields():
392						strategy = map_missing.get(missing_field, default_strategy)
393
394						if callable(strategy):
395							# delegate handling to a callback
396							mapped_item.data[missing_field] = strategy(mapped_item.data, missing_field)
397						elif strategy == "abort":
398							# raise an exception to be handled at the processor level
399							raise MappedItemIncompleteException(f"Cannot process item, field {missing_field} missing in source data.")
400						elif strategy == "default":
401							# use whatever was passed to the object constructor
402							mapped_item.data[missing_field] = mapped_item.data[missing_field].value
403						else:
404							raise ValueError("map_missing must be 'abort', 'default', or a callback.")
405
406			else:
407				mapped_item = original_item
408
409			# yield a DatasetItem, which is a dict with some special properties
410			yield DatasetItem(mapper=item_mapper, original=original_item, mapped_object=mapped_item, **(mapped_item.get_item_data() if type(mapped_item) is MappedItem else mapped_item))

Generate mapped dataset items

Wrapper for _iterate_items that returns a DatasetItem, which can be accessed as a dict returning the original item or (if a mapper is available) the mapped item. Mapped or original versions of the item can also be accessed via the original and mapped_object properties of the DatasetItem.

Processors can define a method called map_item that can be used to map an item from the dataset file before it is processed any further. This is slower than storing the data file in the right format to begin with but not all data sources allow for easy 'flat' mapping of items, e.g. tweets are nested objects when retrieved from the twitter API that are easier to store as a JSON file than as a flat CSV file, and it would be a shame to throw away that data.

Note the two parameters warn_unmappable and map_missing. Items can be unmappable in that their structure is too different to coerce into a neat dictionary of the structure the data source expects. This makes it 'unmappable' and warn_unmappable determines what happens in this case. It can also be of the right structure, but with some fields missing or incomplete. map_missing determines what happens in that case. The latter is for example possible when importing data via Zeeschuimer, which produces unstably-structured data captured from social media sites.

Parameters
  • BasicProcessor processor: A reference to the processor iterating the dataset.
  • bool warn_unmappable: If an item is not mappable, skip the item and log a warning
  • map_missing: Indicates what to do with mapped items for which some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
    • 'default': fill missing fields with the default passed by map_item
    • 'abort': raise a MappedItemIncompleteException if a field is missing
    • a callback: replace missing field with the return value of the callback. The MappedItem object is passed to the callback as the first argument and the name of the missing field as the second.
    • a dictionary with a key for each possible missing field: replace missing field with a strategy for that field ('default', 'abort', or a callback)
Returns

A generator that yields DatasetItems

def get_staging_area(self):
412	def get_staging_area(self):
413		"""
414		Get path to a temporary folder in which files can be stored before
415		finishing
416
417		This folder must be created before use, but is guaranteed to not exist
418		yet. The folder may be used as a staging area for the dataset data
419		while it is being processed.
420
421		:return Path:  Path to folder
422		"""
423		results_file = self.get_results_path()
424
425		results_dir_base = results_file.parent
426		results_dir = results_file.name.replace(".", "") + "-staging"
427		results_path = results_dir_base.joinpath(results_dir)
428		index = 1
429		while results_path.exists():
430			results_path = results_dir_base.joinpath(results_dir + "-" + str(index))
431			index += 1
432
433		# create temporary folder
434		results_path.mkdir()
435
436		# Storing the staging area with the dataset so that it can be removed later
437		self.staging_areas.append(results_path)
438
439		return results_path

Get path to a temporary folder in which files can be stored before finishing

This folder must be created before use, but is guaranteed to not exist yet. The folder may be used as a staging area for the dataset data while it is being processed.

Returns

Path to folder

def remove_staging_areas(self):
441	def remove_staging_areas(self):
442		"""
443		Remove any staging areas that were created and all files contained in them.
444		"""
445		# Remove DataSet staging areas
446		if self.staging_areas:
447			for staging_area in self.staging_areas:
448				if staging_area.is_dir():
449					shutil.rmtree(staging_area)

Remove any staging areas that were created and all files contained in them.

def finish(self, num_rows=0):
451	def finish(self, num_rows=0):
452		"""
453		Declare the dataset finished
454		"""
455		if self.data["is_finished"]:
456			raise RuntimeError("Cannot finish a finished dataset again")
457
458		self.db.update("datasets", where={"key": self.data["key"]},
459					   data={"is_finished": True, "num_rows": num_rows, "progress": 1.0, "timestamp_finished": int(time.time())})
460		self.data["is_finished"] = True
461		self.data["num_rows"] = num_rows

Declare the dataset finished

def copy(self, shallow=True):
463	def copy(self, shallow=True):
464		"""
465		Copies the dataset, making a new version with a unique key
466
467
468		:param bool shallow:  Shallow copy: does not copy the result file, but
469		instead refers to the same file as the original dataset did
470		:return Dataset:  Copied dataset
471		"""
472		parameters = self.parameters.copy()
473
474		# a key is partially based on the parameters. so by setting these extra
475		# attributes, we also ensure a unique key will be generated for the
476		# copy
477		# possibly todo: don't use time for uniqueness (but one shouldn't be
478		# copying a dataset multiple times per microsecond, that's not what
479		# this is for)
480		parameters["copied_from"] = self.key
481		parameters["copied_at"] = time.time()
482
483		copy = DataSet(parameters=parameters, db=self.db, extension=self.result_file.split(".")[-1], type=self.type, modules=self.modules)
484		for field in self.data:
485			if field in ("id", "key", "timestamp", "job", "parameters", "result_file"):
486				continue
487
488			copy.__setattr__(field, self.data[field])
489
490		if shallow:
491			# use the same result file
492			copy.result_file = self.result_file
493		else:
494			# copy to new file with new key
495			shutil.copy(self.get_results_path(), copy.get_results_path())
496
497		if self.is_finished():
498			copy.finish(self.num_rows)
499
500		# make sure ownership is also copied
501		copy.copy_ownership_from(self)
502
503		return copy

Copies the dataset, making a new version with a unique key

Parameters
  • bool shallow: Shallow copy: does not copy the result file, but instead refers to the same file as the original dataset did
Returns

Copied dataset

def delete(self, commit=True, queue=None):
505	def delete(self, commit=True, queue=None):
506		"""
507		Delete the dataset, and all its children
508
509		Deletes both database records and result files. Note that manipulating
510		a dataset object after it has been deleted is undefined behaviour.
511
512		:param bool commit:  Commit SQL DELETE query?
513		"""
514		# first, recursively delete children
515		children = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,))
516		for child in children:
517			try:
518				child = DataSet(key=child["key"], db=self.db, modules=self.modules)
519				child.delete(commit=commit)
520			except DataSetException:
521				# dataset already deleted - race condition?
522				pass
523
524		# delete any queued jobs for this dataset
525		try:
526			job = Job.get_by_remote_ID(self.key, self.db, self.type)
527			if job.is_claimed:
528				# tell API to stop any jobs running for this dataset
529				# level 2 = cancel job
530				# we're not interested in the result - if the API is available,
531				# it will do its thing, if it's not the backend is probably not
532				# running so the job also doesn't need to be interrupted
533				call_api(
534					"cancel-job",
535					{"remote_id": self.key, "jobtype": self.type, "level": 2},
536					False
537				)
538
539			# this deletes the job from the database
540			job.finish(True)
541
542		except JobNotFoundException:
543			pass
544
545		# delete from database
546		self.db.delete("datasets", where={"key": self.key}, commit=commit)
547		self.db.delete("datasets_owners", where={"key": self.key}, commit=commit)
548		self.db.delete("users_favourites", where={"key": self.key}, commit=commit)
549
550		# delete from drive
551		try:
552			if self.get_results_path().exists():
553				self.get_results_path().unlink()
554			if self.get_results_path().with_suffix(".log").exists():
555				self.get_results_path().with_suffix(".log").unlink()
556			if self.get_results_folder_path().exists():
557				shutil.rmtree(self.get_results_folder_path())
558
559		except FileNotFoundError:
560			# already deleted, apparently
561			pass
562		except PermissionError as e:
563			self.db.log.error(f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}")

Delete the dataset, and all its children

Deletes both database records and result files. Note that manipulating a dataset object after it has been deleted is undefined behaviour.

Parameters
  • bool commit: Commit SQL DELETE query?
def update_children(self, **kwargs):
565	def update_children(self, **kwargs):
566		"""
567		Update an attribute for all child datasets
568
569		Can be used to e.g. change the owner, version, finished status for all
570		datasets in a tree
571
572		:param kwargs:  Parameters corresponding to known dataset attributes
573		"""
574		children = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,))
575		for child in children:
576			child = DataSet(key=child["key"], db=self.db, modules=self.modules)
577			for attr, value in kwargs.items():
578				child.__setattr__(attr, value)
579
580			child.update_children(**kwargs)

Update an attribute for all child datasets

Can be used to e.g. change the owner, version, finished status for all datasets in a tree

Parameters
  • kwargs: Parameters corresponding to known dataset attributes
def is_finished(self):
582	def is_finished(self):
583		"""
584		Check if dataset is finished
585		:return bool:
586		"""
587		return self.data["is_finished"] == True

Check if dataset is finished

Returns
def is_rankable(self, multiple_items=True):
589	def is_rankable(self, multiple_items=True):
590		"""
591		Determine if a dataset is rankable
592
593		Rankable means that it is a CSV file with 'date' and 'value' columns
594		as well as one or more item label columns
595
596		:param bool multiple_items:  Consider datasets with multiple items per
597		item (e.g. word_1, word_2, etc)?
598
599		:return bool:  Whether the dataset is rankable or not
600		"""
601		if self.get_results_path().suffix != ".csv" or not self.get_results_path().exists():
602			return False
603
604		column_options = {"date", "value", "item"}
605		if multiple_items:
606			column_options.add("word_1")
607
608		with self.get_results_path().open(encoding="utf-8") as infile:
609			reader = csv.DictReader(infile)
610			try:
611				return len(set(reader.fieldnames) & column_options) >= 3
612			except (TypeError, ValueError):
613				return False

Determine if a dataset is rankable

Rankable means that it is a CSV file with 'date' and 'value' columns as well as one or more item label columns

Parameters
  • bool multiple_items: Consider datasets with multiple items per item (e.g. word_1, word_2, etc)?
Returns

Whether the dataset is rankable or not

def is_accessible_by(self, username, role='owner'):
615	def is_accessible_by(self, username, role="owner"):
616		"""
617		Check if dataset has given user as owner
618
619		:param str|User username: Username to check for
620		:return bool:
621		"""
622		if type(username) is not str:
623			if hasattr(username, "get_id"):
624				username = username.get_id()
625			else:
626				raise TypeError("User must be a str or User object")
627
628		# 'normal' owners
629		if username in [owner for owner, meta in self.owners.items() if (role is None or meta["role"] == role)]:
630			return True
631
632		# owners that are owner by being part of a tag
633		if username in itertools.chain(*[tagged_owners for tag, tagged_owners in self.tagged_owners.items() if (role is None or self.owners[f"tag:{tag}"]["role"] == role)]):
634			return True
635
636		return False

Check if dataset has given user as owner

Parameters
  • str|User username: Username to check for
Returns
def get_owners_users(self, role='owner'):
638	def get_owners_users(self, role="owner"):
639		"""
640		Get list of dataset owners
641
642		This returns a list of *users* that are considered owners. Tags are
643		transparently replaced with the users with that tag.
644
645		:param str|None role:  Role to check for. If `None`, all owners are
646		returned regardless of role.
647
648		:return set:  De-duplicated owner list
649		"""
650		# 'normal' owners
651		owners = [owner for owner, meta in self.owners.items() if
652				  (role is None or meta["role"] == role) and not owner.startswith("tag:")]
653
654		# owners that are owner by being part of a tag
655		owners.extend(itertools.chain(*[tagged_owners for tag, tagged_owners in self.tagged_owners.items() if
656									   role is None or self.owners[f"tag:{tag}"]["role"] == role]))
657
658		# de-duplicate before returning
659		return set(owners)

Get list of dataset owners

This returns a list of users that are considered owners. Tags are transparently replaced with the users with that tag.

Parameters
  • str|None role: Role to check for. If None, all owners are returned regardless of role.
Returns

De-duplicated owner list

def get_owners(self, role='owner'):
661	def get_owners(self, role="owner"):
662		"""
663		Get list of dataset owners
664
665		This returns a list of all owners, and does not transparently resolve
666		tags (like `get_owners_users` does).
667
668		:param str|None role:  Role to check for. If `None`, all owners are
669		returned regardless of role.
670
671		:return set:  De-duplicated owner list
672		"""
673		return [owner for owner, meta in self.owners.items() if (role is None or meta["role"] == role)]

Get list of dataset owners

This returns a list of all owners, and does not transparently resolve tags (like get_owners_users does).

Parameters
  • str|None role: Role to check for. If None, all owners are returned regardless of role.
Returns

De-duplicated owner list

def add_owner(self, username, role='owner'):
675	def add_owner(self, username, role="owner"):
676		"""
677		Set dataset owner
678
679		If the user is already an owner, but with a different role, the role is
680		updated. If the user is already an owner with the same role, nothing happens.
681
682		:param str|User username:  Username to set as owner
683		:param str|None role:  Role to add user with.
684		"""
685		if type(username) is not str:
686			if hasattr(username, "get_id"):
687				username = username.get_id()
688			else:
689				raise TypeError("User must be a str or User object")
690
691		if username not in self.owners:
692			self.owners[username] = {
693				"name": username,
694				"key": self.key,
695				"role": role
696			}
697			self.db.insert("datasets_owners", data=self.owners[username], safe=True)
698
699		elif username in self.owners and self.owners[username]["role"] != role:
700			self.db.update("datasets_owners", data={"role": role}, where={"name": username, "key": self.key})
701			self.owners[username]["role"] = role
702
703		if username.startswith("tag:"):
704			# this is a bit more complicated than just adding to the list of
705			# owners, so do a full refresh
706			self.refresh_owners()
707
708		# make sure children's owners remain in sync
709		for child in self.children:
710			child.add_owner(username, role)
711			# not recursive, since we're calling it from recursive code!
712			child.copy_ownership_from(self, recursive=False)

Set dataset owner

If the user is already an owner, but with a different role, the role is updated. If the user is already an owner with the same role, nothing happens.

Parameters
  • str|User username: Username to set as owner
  • str|None role: Role to add user with.
def remove_owner(self, username):
714	def remove_owner(self, username):
715		"""
716		Remove dataset owner
717
718		If no owner is set, the dataset is assigned to the anonymous user.
719		If the user is not an owner, nothing happens.
720
721		:param str|User username:  Username to set as owner
722		"""
723		if type(username) is not str:
724			if hasattr(username, "get_id"):
725				username = username.get_id()
726			else:
727				raise TypeError("User must be a str or User object")
728
729		if username in self.owners:
730			del self.owners[username]
731			self.db.delete("datasets_owners", where={"name": username, "key": self.key})
732
733			if not self.owners:
734				self.add_owner("anonymous")
735
736		if username in self.tagged_owners:
737			del self.tagged_owners[username]
738
739		# make sure children's owners remain in sync
740		for child in self.children:
741			child.remove_owner(username)
742			# not recursive, since we're calling it from recursive code!
743			child.copy_ownership_from(self, recursive=False)

Remove dataset owner

If no owner is set, the dataset is assigned to the anonymous user. If the user is not an owner, nothing happens.

Parameters
  • str|User username: Username to set as owner
def refresh_owners(self):
745	def refresh_owners(self):
746		"""
747		Update internal owner cache
748
749		This makes sure that the list of *users* and *tags* which can access the
750		dataset is up to date.
751		"""
752		self.owners = {owner["name"]: owner for owner in self.db.fetchall("SELECT * FROM datasets_owners WHERE key = %s", (self.key,))}
753
754		# determine which users (if any) are owners of the dataset by having a
755		# tag that is listed as an owner
756		owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")]
757		if owner_tags:
758			tagged_owners = self.db.fetchall("SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,))
759			self.tagged_owners = {
760				owner_tag: [user["name"] for user in tagged_owners if owner_tag in user["tags"]]
761				for owner_tag in owner_tags
762			}
763		else:
764			self.tagged_owners = {}

Update internal owner cache

This makes sure that the list of users and tags which can access the dataset is up to date.

def copy_ownership_from(self, dataset, recursive=True):
766	def copy_ownership_from(self, dataset, recursive=True):
767		"""
768		Copy ownership
769
770		This is useful to e.g. make sure a dataset's ownership stays in sync
771		with its parent
772
773		:param Dataset dataset:  Parent to copy from
774		:return:
775		"""
776		self.db.delete("datasets_owners", where={"key": self.key}, commit=False)
777
778		for role in ("owner", "viewer"):
779			owners = dataset.get_owners(role=role)
780			for owner in owners:
781				self.db.insert("datasets_owners", data={"key": self.key, "name": owner, "role": role}, commit=False, safe=True)
782
783		self.db.commit()
784		if recursive:
785			for child in self.children:
786				child.copy_ownership_from(self, recursive=recursive)

Copy ownership

This is useful to e.g. make sure a dataset's ownership stays in sync with its parent

Parameters
  • Dataset dataset: Parent to copy from
Returns
def get_parameters(self):
788	def get_parameters(self):
789		"""
790		Get dataset parameters
791
792		The dataset parameters are stored as JSON in the database - parse them
793		and return the resulting object
794
795		:return:  Dataset parameters as originally stored
796		"""
797		try:
798			return json.loads(self.data["parameters"])
799		except json.JSONDecodeError:
800			return {}

Get dataset parameters

The dataset parameters are stored as JSON in the database - parse them and return the resulting object

Returns

Dataset parameters as originally stored

def get_columns(self):
802	def get_columns(self):
803		"""
804		Returns the dataset columns.
805
806		Useful for processor input forms. Can deal with both CSV and NDJSON
807		files, the latter only if a `map_item` function is available in the
808		processor that generated it. While in other cases one could use the
809		keys of the JSON object, this is not always possible in follow-up code
810		that uses the 'column' names, so for consistency this function acts as
811		if no column can be parsed if no `map_item` function exists.
812
813		:return list:  List of dataset columns; empty list if unable to parse
814		"""
815		if not self.get_results_path().exists():
816			# no file to get columns from
817			return []
818
819		if (self.get_results_path().suffix.lower() == ".csv") or (self.get_results_path().suffix.lower() == ".ndjson" and self.get_own_processor() is not None and self.get_own_processor().map_item_method_available(dataset=self)):
820			items = self.iterate_items(warn_unmappable=False)
821			try:
822				keys = list(items.__next__().keys())
823			except (StopIteration, NotImplementedError):
824				# No items or otherwise unable to iterate
825				return []
826			finally:
827				del items
828			return keys
829		else:
830			# Filetype not CSV or an NDJSON with `map_item`
831			return []

Returns the dataset columns.

Useful for processor input forms. Can deal with both CSV and NDJSON files, the latter only if a map_item function is available in the processor that generated it. While in other cases one could use the keys of the JSON object, this is not always possible in follow-up code that uses the 'column' names, so for consistency this function acts as if no column can be parsed if no map_item function exists.

Returns

List of dataset columns; empty list if unable to parse

def get_annotation_fields(self):
833	def get_annotation_fields(self):
834		"""
835		Retrieves the saved annotation fields for this dataset.
836		:return dict: The saved annotation fields.
837		"""
838
839		annotation_fields = self.db.fetchone("SELECT annotation_fields FROM datasets WHERE key = %s;", (self.top_parent().key,))
840		
841		if annotation_fields and annotation_fields.get("annotation_fields"):
842			annotation_fields = json.loads(annotation_fields["annotation_fields"])
843		else:
844			annotation_fields = {}
845
846		return annotation_fields

Retrieves the saved annotation fields for this dataset.

Returns

The saved annotation fields.

def get_annotations(self):
848	def get_annotations(self):
849		"""
850		Retrieves the annotations for this dataset.
851		return dict: The annotations
852		"""
853
854		annotations = self.db.fetchone("SELECT annotations FROM annotations WHERE key = %s;", (self.top_parent().key,))
855
856		if annotations and annotations.get("annotations"):
857			return json.loads(annotations["annotations"])
858		else:
859			return None

Retrieves the annotations for this dataset. return dict: The annotations

def update_label(self, label):
861	def update_label(self, label):
862		"""
863		Update label for this dataset
864
865		:param str label:  New label
866		:return str:  The new label, as returned by get_label
867		"""
868		self.parameters["label"] = label
869
870		self.db.update("datasets", data={"parameters": json.dumps(self.parameters)}, where={"key": self.key})
871		return self.get_label()

Update label for this dataset

Parameters
  • str label: New label
Returns

The new label, as returned by get_label

def get_label(self, parameters=None, default='Query'):
873	def get_label(self, parameters=None, default="Query"):
874		"""
875		Generate a readable label for the dataset
876
877		:param dict parameters:  Parameters of the dataset
878		:param str default:  Label to use if it cannot be inferred from the
879		parameters
880
881		:return str:  Label
882		"""
883		if not parameters:
884			parameters = self.parameters
885
886		if parameters.get("label"):
887			return parameters["label"]
888		elif parameters.get("body_query") and parameters["body_query"] != "empty":
889			return parameters["body_query"]
890		elif parameters.get("body_match") and parameters["body_match"] != "empty":
891			return parameters["body_match"]
892		elif parameters.get("subject_query") and parameters["subject_query"] != "empty":
893			return parameters["subject_query"]
894		elif parameters.get("subject_match") and parameters["subject_match"] != "empty":
895			return parameters["subject_match"]
896		elif parameters.get("query"):
897			label = parameters["query"]
898			# Some legacy datasets have lists as query data
899			if isinstance(label, list):
900				label = ", ".join(label)
901
902			label = label if len(label) < 30 else label[:25] + "..."
903			label = label.strip().replace("\n", ", ")
904			return label
905		elif parameters.get("country_flag") and parameters["country_flag"] != "all":
906			return "Flag: %s" % parameters["country_flag"]
907		elif parameters.get("country_name") and parameters["country_name"] != "all":
908			return "Country: %s" % parameters["country_name"]
909		elif parameters.get("filename"):
910			return parameters["filename"]
911		elif parameters.get("board") and "datasource" in parameters:
912			return parameters["datasource"] + "/" + parameters["board"]
913		elif "datasource" in parameters and parameters["datasource"] in self.modules.datasources:
914			return self.modules.datasources[parameters["datasource"]]["name"] + " Dataset"
915		else:
916			return default

Generate a readable label for the dataset

Parameters
  • dict parameters: Parameters of the dataset
  • str default: Label to use if it cannot be inferred from the parameters
Returns

Label

def change_datasource(self, datasource):
918	def change_datasource(self, datasource):
919		"""
920		Change the datasource type for this dataset
921
922		:param str label:  New datasource type
923		:return str:  The new datasource type
924		"""
925
926		self.parameters["datasource"] = datasource
927
928		self.db.update("datasets", data={"parameters": json.dumps(self.parameters)}, where={"key": self.key})
929		return datasource

Change the datasource type for this dataset

Parameters
  • str label: New datasource type
Returns

The new datasource type

def reserve_result_file(self, parameters=None, extension='csv'):
931	def reserve_result_file(self, parameters=None, extension="csv"):
932		"""
933		Generate a unique path to the results file for this dataset
934
935		This generates a file name for the data file of this dataset, and makes sure
936		no file exists or will exist at that location other than the file we
937		expect (i.e. the data for this particular dataset).
938
939		:param str extension: File extension, "csv" by default
940		:param parameters:  Dataset parameters
941		:return bool:  Whether the file path was successfully reserved
942		"""
943		if self.data["is_finished"]:
944			raise RuntimeError("Cannot reserve results file for a finished dataset")
945
946		# Use 'random' for random post queries
947		if "random_amount" in parameters and int(parameters["random_amount"]) > 0:
948			file = 'random-' + str(parameters["random_amount"]) + '-' + self.data["key"]
949		# Use country code for country flag queries
950		elif "country_flag" in parameters and parameters["country_flag"] != 'all':
951			file = 'countryflag-' + str(parameters["country_flag"]) + '-' + self.data["key"]
952		# Use the query string for all other queries
953		else:
954			query_bit = self.data["query"].replace(" ", "-").lower()
955			query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit)
956			query_bit = query_bit[:100]  # Crop to avoid OSError
957			file = query_bit + "-" + self.data["key"]
958			file = re.sub(r"[-]+", "-", file)
959
960		path = self.folder.joinpath(file + "." + extension.lower())
961		index = 1
962		while path.is_file():
963			path = self.folder.joinpath(file + "-" + str(index) + "." + extension.lower())
964			index += 1
965
966		file = path.name
967		updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]},
968								 data={"result_file": file})
969		self.data["result_file"] = file
970		return updated > 0

Generate a unique path to the results file for this dataset

This generates a file name for the data file of this dataset, and makes sure no file exists or will exist at that location other than the file we expect (i.e. the data for this particular dataset).

Parameters
  • str extension: File extension, "csv" by default
  • parameters: Dataset parameters
Returns

Whether the file path was successfully reserved

def get_key(self, query, parameters, parent='', time_offset=0):
 972	def get_key(self, query, parameters, parent="", time_offset=0):
 973		"""
 974		Generate a unique key for this dataset that can be used to identify it
 975
 976		The key is a hash of a combination of the query string and parameters.
 977		You never need to call this, really: it's used internally.
 978
 979		:param str query:  Query string
 980		:param parameters:  Dataset parameters
 981		:param parent: Parent dataset's key (if applicable)
 982		:param time_offset:  Offset to add to the time component of the dataset
 983		key. This can be used to ensure a unique key even if the parameters and
 984		timing is otherwise identical to an existing dataset's
 985
 986		:return str:  Dataset key
 987		"""
 988		# Return a hash based on parameters
 989		# we're going to use the hash of the parameters to uniquely identify
 990		# the dataset, so make sure it's always in the same order, or we might
 991		# end up creating multiple keys for the same dataset if python
 992		# decides to return the dict in a different order
 993		param_key = collections.OrderedDict()
 994		for key in sorted(parameters):
 995			param_key[key] = parameters[key]
 996
 997		# we additionally use the current time as a salt - this should usually
 998		# ensure a unique key for the dataset. if for some reason there is a
 999		# hash collision
1000		param_key["_salt"] = int(time.time()) + time_offset
1001
1002		parent_key = str(parent) if parent else ""
1003		plain_key = repr(param_key) + str(query) + parent_key
1004		hashed_key = hashlib.md5(plain_key.encode("utf-8")).hexdigest()
1005
1006		if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)):
1007			# key exists, generate a new one
1008			return self.get_key(query, parameters, parent, time_offset=random.randint(1,10))
1009		else:
1010			return hashed_key

Generate a unique key for this dataset that can be used to identify it

The key is a hash of a combination of the query string and parameters. You never need to call this, really: it's used internally.

Parameters
  • str query: Query string
  • parameters: Dataset parameters
  • parent: Parent dataset's key (if applicable)
  • time_offset: Offset to add to the time component of the dataset key. This can be used to ensure a unique key even if the parameters and timing is otherwise identical to an existing dataset's
Returns

Dataset key

def set_key(self, key):
1012	def set_key(self, key):
1013		"""
1014		Change dataset key
1015
1016		In principe, keys should never be changed. But there are rare cases
1017		where it is useful to do so, in particular when importing a dataset
1018		from another 4CAT instance; in that case it makes sense to try and
1019		ensure that the key is the same as it was before. This function sets
1020		the dataset key and updates any dataset references to it.
1021
1022		:param str key:  Key to set
1023		:return str:  Key that was set. If the desired key already exists, the
1024		original key is kept.
1025		"""
1026		key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,))
1027		if key_exists or not key:
1028			return self.key
1029
1030		old_key = self.key
1031		self.db.update("datasets", data={"key": key}, where={"key": old_key})
1032
1033		# update references
1034		self.db.update("datasets", data={"key_parent": key}, where={"key_parent": old_key})
1035		self.db.update("datasets_owners", data={"key": key}, where={"key": old_key})
1036		self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key})
1037		self.db.update("users_favourites", data={"key": key}, where={"key": old_key})
1038
1039		# for good measure
1040		self.db.commit()
1041		self.key = key
1042
1043		return self.key

Change dataset key

In principe, keys should never be changed. But there are rare cases where it is useful to do so, in particular when importing a dataset from another 4CAT instance; in that case it makes sense to try and ensure that the key is the same as it was before. This function sets the dataset key and updates any dataset references to it.

Parameters
  • str key: Key to set
Returns

Key that was set. If the desired key already exists, the original key is kept.

def get_status(self):
1045	def get_status(self):
1046		"""
1047		Get Dataset status
1048
1049		:return string: Dataset status
1050		"""
1051		return self.data["status"]

Get Dataset status

Returns

Dataset status

def update_status(self, status, is_final=False):
1053	def update_status(self, status, is_final=False):
1054		"""
1055		Update dataset status
1056
1057		The status is a string that may be displayed to a user to keep them
1058		updated and informed about the progress of a dataset. No memory is kept
1059		of earlier dataset statuses; the current status is overwritten when
1060		updated.
1061
1062		Statuses are also written to the dataset log file.
1063
1064		:param string status:  Dataset status
1065		:param bool is_final:  If this is `True`, subsequent calls to this
1066		method while the object is instantiated will not update the dataset
1067		status.
1068		:return bool:  Status update successful?
1069		"""
1070		if self.no_status_updates:
1071			return
1072
1073		# for presets, copy the updated status to the preset(s) this is part of
1074		if self.preset_parent is None:
1075			self.preset_parent = [parent for parent in self.get_genealogy() if parent.type.find("preset-") == 0 and parent.key != self.key][:1]
1076
1077		if self.preset_parent:
1078			for preset_parent in self.preset_parent:
1079				if not preset_parent.is_finished():
1080					preset_parent.update_status(status)
1081
1082		self.data["status"] = status
1083		updated = self.db.update("datasets", where={"key": self.data["key"]}, data={"status": status})
1084
1085		if is_final:
1086			self.no_status_updates = True
1087
1088		self.log(status)
1089
1090		return updated > 0

Update dataset status

The status is a string that may be displayed to a user to keep them updated and informed about the progress of a dataset. No memory is kept of earlier dataset statuses; the current status is overwritten when updated.

Statuses are also written to the dataset log file.

Parameters
  • string status: Dataset status
  • bool is_final: If this is True, subsequent calls to this method while the object is instantiated will not update the dataset status.
Returns

Status update successful?

def update_progress(self, progress):
1092	def update_progress(self, progress):
1093		"""
1094		Update dataset progress
1095
1096		The progress can be used to indicate to a user how close the dataset
1097		is to completion.
1098
1099		:param float progress:  Between 0 and 1.
1100		:return:
1101		"""
1102		progress = min(1, max(0, progress))  # clamp
1103		if type(progress) is int:
1104			progress = float(progress)
1105
1106		self.data["progress"] = progress
1107		updated = self.db.update("datasets", where={"key": self.data["key"]}, data={"progress": progress})
1108		return updated > 0

Update dataset progress

The progress can be used to indicate to a user how close the dataset is to completion.

Parameters
  • float progress: Between 0 and 1.
Returns
def get_progress(self):
1110	def get_progress(self):
1111		"""
1112		Get dataset progress
1113
1114		:return float:  Progress, between 0 and 1
1115		"""
1116		return self.data["progress"]

Get dataset progress

Returns

Progress, between 0 and 1

def finish_with_error(self, error):
1118	def finish_with_error(self, error):
1119		"""
1120		Set error as final status, and finish with 0 results
1121
1122		This is a convenience function to avoid having to repeat
1123		"update_status" and "finish" a lot.
1124
1125		:param str error:  Error message for final dataset status.
1126		:return:
1127		"""
1128		self.update_status(error, is_final=True)
1129		self.finish(0)
1130
1131		return None

Set error as final status, and finish with 0 results

This is a convenience function to avoid having to repeat "update_status" and "finish" a lot.

Parameters
  • str error: Error message for final dataset status.
Returns
def update_version(self, version):
1133	def update_version(self, version):
1134		"""
1135		Update software version used for this dataset
1136
1137		This can be used to verify the code that was used to process this dataset.
1138
1139		:param string version:  Version identifier
1140		:return bool:  Update successul?
1141		"""
1142		try:
1143			# this fails if the processor type is unknown
1144			# edge case, but let's not crash...
1145			processor_path = self.modules.processors.get(self.data["type"]).filepath
1146		except AttributeError:
1147			processor_path = ""
1148
1149		updated = self.db.update("datasets", where={"key": self.data["key"]}, data={
1150			"software_version": version[0],
1151			"software_source": version[1],
1152			"software_file": processor_path
1153		})
1154
1155		return updated > 0

Update software version used for this dataset

This can be used to verify the code that was used to process this dataset.

Parameters
  • string version: Version identifier
Returns

Update successul?

def delete_parameter(self, parameter, instant=True):
1157	def delete_parameter(self, parameter, instant=True):
1158		"""
1159		Delete a parameter from the dataset metadata
1160
1161		:param string parameter:  Parameter to delete
1162		:param bool instant:  Also delete parameters in this instance object?
1163		:return bool:  Update successul?
1164		"""
1165		parameters = self.parameters.copy()
1166		if parameter in parameters:
1167			del parameters[parameter]
1168		else:
1169			return False
1170
1171		updated = self.db.update("datasets", where={"key": self.data["key"]},
1172								 data={"parameters": json.dumps(parameters)})
1173
1174		if instant:
1175			self.parameters = parameters
1176
1177		return updated > 0

Delete a parameter from the dataset metadata

Parameters
  • string parameter: Parameter to delete
  • bool instant: Also delete parameters in this instance object?
Returns

Update successul?

def get_version_url(self, file):
1179	def get_version_url(self, file):
1180		"""
1181		Get a versioned github URL for the version this dataset was processed with
1182
1183		:param file:  File to link within the repository
1184		:return:  URL, or an empty string
1185		"""
1186		if not self.data["software_source"]:
1187			return ""
1188
1189		filepath = self.data.get("software_file", "")
1190		if filepath.startswith("/extensions/"):
1191			# go to root of extension
1192			filepath = "/" + "/".join(filepath.split("/")[3:])
1193
1194		return self.data["software_source"] + "/blob/" + self.data["software_version"] + filepath

Get a versioned github URL for the version this dataset was processed with

Parameters
  • file: File to link within the repository
Returns

URL, or an empty string

def top_parent(self):
1196	def top_parent(self):
1197		"""
1198		Get root dataset
1199
1200		Traverses the tree of datasets this one is part of until it finds one
1201		with no source_dataset dataset, then returns that dataset.
1202
1203		:return Dataset: Parent dataset
1204		"""
1205		genealogy = self.get_genealogy()
1206		return genealogy[0]

Get root dataset

Traverses the tree of datasets this one is part of until it finds one with no source_dataset dataset, then returns that dataset.

Returns

Parent dataset

def get_genealogy(self, inclusive=False):
1208	def get_genealogy(self, inclusive=False):
1209		"""
1210		Get genealogy of this dataset
1211
1212		Creates a list of DataSet objects, with the first one being the
1213		'top' dataset, and each subsequent one being a child of the previous
1214		one, ending with the current dataset.
1215
1216		:return list:  Dataset genealogy, oldest dataset first
1217		"""
1218		if self.genealogy and not inclusive:
1219			return self.genealogy
1220
1221		key_parent = self.key_parent
1222		genealogy = []
1223
1224		while key_parent:
1225			try:
1226				parent = DataSet(key=key_parent, db=self.db, modules=self.modules)
1227			except DataSetException:
1228				break
1229
1230			genealogy.append(parent)
1231			if parent.key_parent:
1232				key_parent = parent.key_parent
1233			else:
1234				break
1235
1236		genealogy.reverse()
1237		genealogy.append(self)
1238
1239		self.genealogy = genealogy
1240		return self.genealogy

Get genealogy of this dataset

Creates a list of DataSet objects, with the first one being the 'top' dataset, and each subsequent one being a child of the previous one, ending with the current dataset.

Returns

Dataset genealogy, oldest dataset first

def get_all_children(self, recursive=True):
1242	def get_all_children(self, recursive=True):
1243		"""
1244		Get all children of this dataset
1245
1246		Results are returned as a non-hierarchical list, i.e. the result does
1247		not reflect the actual dataset hierarchy (but all datasets in the
1248		result will have the original dataset as an ancestor somewhere)
1249
1250		:return list:  List of DataSets
1251		"""
1252		children = [DataSet(data=record, db=self.db, modules=self.modules) for record in self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,))]
1253		results = children.copy()
1254		if recursive:
1255			for child in children:
1256				results += child.get_all_children(recursive)
1257
1258		return results

Get all children of this dataset

Results are returned as a non-hierarchical list, i.e. the result does not reflect the actual dataset hierarchy (but all datasets in the result will have the original dataset as an ancestor somewhere)

Returns

List of DataSets

def nearest(self, type_filter):
1260	def nearest(self, type_filter):
1261		"""
1262		Return nearest dataset that matches the given type
1263
1264		Starting with this dataset, traverse the hierarchy upwards and return
1265		whichever dataset matches the given type.
1266
1267		:param str type_filter:  Type filter. Can contain wildcards and is matched
1268		using `fnmatch.fnmatch`.
1269		:return:  Earliest matching dataset, or `None` if none match.
1270		"""
1271		genealogy = self.get_genealogy(inclusive=True)
1272		for dataset in reversed(genealogy):
1273			if fnmatch.fnmatch(dataset.type, type_filter):
1274				return dataset
1275
1276		return None

Return nearest dataset that matches the given type

Starting with this dataset, traverse the hierarchy upwards and return whichever dataset matches the given type.

Parameters
  • str type_filter: Type filter. Can contain wildcards and is matched using fnmatch.fnmatch.
Returns

Earliest matching dataset, or None if none match.

def get_breadcrumbs(self):
1278	def get_breadcrumbs(self):
1279		"""
1280		Get breadcrumbs navlink for use in permalinks
1281
1282		Returns a string representing this dataset's genealogy that may be used
1283		to uniquely identify it.
1284
1285		:return str: Nav link
1286		"""
1287		if self.genealogy:
1288			return ",".join([dataset.key for dataset in self.genealogy])
1289		else:
1290			# Collect keys only
1291			key_parent = self.key  # Start at the bottom
1292			genealogy = []
1293
1294			while key_parent:
1295				try:
1296					parent = self.db.fetchone("SELECT key_parent FROM datasets WHERE key = %s", (key_parent,))
1297				except TypeError:
1298					break
1299
1300				key_parent = parent["key_parent"]
1301				if key_parent:
1302					genealogy.append(key_parent)
1303				else:
1304					break
1305
1306			genealogy.reverse()
1307			genealogy.append(self.key)
1308			return ",".join(genealogy)

Get breadcrumbs navlink for use in permalinks

Returns a string representing this dataset's genealogy that may be used to uniquely identify it.

Returns

Nav link

def get_compatible_processors(self, user=None):
1310	def get_compatible_processors(self, user=None):
1311		"""
1312		Get list of processors compatible with this dataset
1313
1314		Checks whether this dataset type is one that is listed as being accepted
1315		by the processor, for each known type: if the processor does not
1316		specify accepted types (via the `is_compatible_with` method), it is
1317		assumed it accepts any top-level datasets
1318
1319		:param str|User|None user:  User to get compatibility for. If set,
1320		use the user-specific config settings where available.
1321
1322		:return dict:  Compatible processors, `name => class` mapping
1323		"""
1324		processors = self.modules.processors
1325
1326		available = {}
1327		for processor_type, processor in processors.items():
1328			if processor.is_from_collector():
1329				continue
1330
1331			own_processor = self.get_own_processor()
1332			if own_processor and own_processor.exclude_followup_processors(processor_type):
1333				continue
1334
1335			# consider a processor compatible if its is_compatible_with
1336			# method returns True *or* if it has no explicit compatibility
1337			# check and this dataset is top-level (i.e. has no parent)
1338			if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \
1339					or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, user=user)):
1340				available[processor_type] = processor
1341
1342		return available

Get list of processors compatible with this dataset

Checks whether this dataset type is one that is listed as being accepted by the processor, for each known type: if the processor does not specify accepted types (via the is_compatible_with method), it is assumed it accepts any top-level datasets

Parameters
  • str|User|None user: User to get compatibility for. If set, use the user-specific config settings where available.
Returns

Compatible processors, name => class mapping

def get_place_in_queue(self, update=False):
1344	def get_place_in_queue(self, update=False):
1345		"""
1346		Determine dataset's position in queue
1347
1348		If the dataset is already finished, the position is -1. Else, the
1349		position is the amount of datasets to be completed before this one will
1350		be processed. A position of 0 would mean that the dataset is currently
1351		being executed, or that the backend is not running.
1352
1353		:param bool update:  Update the queue position from database if True, else return cached value
1354		:return int:  Queue position
1355		"""
1356		if self.is_finished() or not self.data.get("job"):
1357			self._queue_position = -1
1358			return self._queue_position
1359		elif not update and self._queue_position is not None:
1360			# Use cached value
1361			return self._queue_position
1362		else:
1363			# Collect queue position from database via the job
1364			try:
1365				job = Job.get_by_ID(self.data["job"], self.db)
1366				self._queue_position = job.get_place_in_queue()
1367			except JobNotFoundException:
1368				self._queue_position = -1
1369
1370			return self._queue_position

Determine dataset's position in queue

If the dataset is already finished, the position is -1. Else, the position is the amount of datasets to be completed before this one will be processed. A position of 0 would mean that the dataset is currently being executed, or that the backend is not running.

Parameters
  • bool update: Update the queue position from database if True, else return cached value
Returns

Queue position

def get_modules(self):
1372	def get_modules(self):
1373		"""
1374		Get 4CAT modules
1375
1376		Is a function because loading them is not free, and this way we can
1377		cache the result.
1378
1379		:return:
1380		"""
1381		if not self.modules:
1382			self.modules = ModuleCollector()
1383
1384		return self.modules

Get 4CAT modules

Is a function because loading them is not free, and this way we can cache the result.

Returns
def get_own_processor(self):
1386	def get_own_processor(self):
1387		"""
1388		Get the processor class that produced this dataset
1389
1390		:return:  Processor class, or `None` if not available.
1391		"""
1392		processor_type = self.parameters.get("type", self.data.get("type"))
1393
1394		return self.modules.processors.get(processor_type)

Get the processor class that produced this dataset

Returns

Processor class, or None if not available.

def get_available_processors(self, user=None, exclude_hidden=False):
1396	def get_available_processors(self, user=None, exclude_hidden=False):
1397		"""
1398		Get list of processors that may be run for this dataset
1399
1400		Returns all compatible processors except for those that are already
1401		queued or finished and have no options. Processors that have been
1402		run but have options are included so they may be run again with a
1403		different configuration
1404
1405		:param str|User|None user:  User to get compatibility for. If set,
1406		use the user-specific config settings where available.
1407		:param bool exclude_hidden:  Exclude processors that should be displayed
1408		in the UI? If `False`, all processors are returned.
1409
1410		:return dict:  Available processors, `name => properties` mapping
1411		"""
1412		if self.available_processors:
1413			# Update to reflect exclude_hidden parameter which may be different from last call
1414			# TODO: could children also have been created? Possible bug, but I have not seen anything effected by this
1415			return {processor_type: processor for processor_type, processor in self.available_processors.items() if not exclude_hidden or not processor.is_hidden}
1416
1417		processors = self.get_compatible_processors(user=user)
1418
1419		for analysis in self.children:
1420			if analysis.type not in processors:
1421				continue
1422
1423			if not processors[analysis.type].get_options():
1424				del processors[analysis.type]
1425				continue
1426
1427			if exclude_hidden and processors[analysis.type].is_hidden:
1428				del processors[analysis.type]
1429
1430		self.available_processors = processors
1431		return processors

Get list of processors that may be run for this dataset

Returns all compatible processors except for those that are already queued or finished and have no options. Processors that have been run but have options are included so they may be run again with a different configuration

Parameters
  • str|User|None user: User to get compatibility for. If set, use the user-specific config settings where available.
  • bool exclude_hidden: Exclude processors that should be displayed in the UI? If False, all processors are returned.
Returns

Available processors, name => properties mapping

def get_parent(self):
1467	def get_parent(self):
1468		"""
1469		Get parent dataset
1470
1471		:return DataSet:  Parent dataset, or `None` if not applicable
1472		"""
1473		return DataSet(key=self.key_parent, db=self.db, modules=self.modules) if self.key_parent else None

Get parent dataset

Returns

Parent dataset, or None if not applicable

def detach(self):
1475	def detach(self):
1476		"""
1477		Makes the datasets standalone, i.e. not having any source_dataset dataset
1478		"""
1479		self.link_parent("")

Makes the datasets standalone, i.e. not having any source_dataset dataset

def is_dataset(self):
1481	def is_dataset(self):
1482		"""
1483		Easy way to confirm this is a dataset.
1484		Used for checking processor and dataset compatibility,
1485		which needs to handle both processors and datasets.
1486		"""
1487		return True

Easy way to confirm this is a dataset. Used for checking processor and dataset compatibility, which needs to handle both processors and datasets.

def is_top_dataset(self):
1489	def is_top_dataset(self):
1490		"""
1491		Easy way to confirm this is a top dataset.
1492		Used for checking processor and dataset compatibility,
1493		which needs to handle both processors and datasets.
1494		"""
1495		if self.key_parent:
1496			return False
1497		return True

Easy way to confirm this is a top dataset. Used for checking processor and dataset compatibility, which needs to handle both processors and datasets.

def is_expiring(self, user=None):
1499	def is_expiring(self, user=None):
1500		"""
1501		Determine if dataset is set to expire
1502
1503		Similar to `is_expired`, but checks if the dataset will be deleted in
1504		the future, not if it should be deleted right now.
1505
1506		:param user:  User to use for configuration context. Provide to make
1507		sure configuration overrides for this user are taken into account.
1508		:return bool|int:  `False`, or the expiration date as a Unix timestamp.
1509		"""
1510		# has someone opted out of deleting this?
1511		if self.parameters.get("keep"):
1512			return False
1513
1514		# is this dataset explicitly marked as expiring after a certain time?
1515		if self.parameters.get("expires-after"):
1516			return self.parameters.get("expires-after")
1517
1518		# is the data source configured to have its datasets expire?
1519		expiration = config.get("datasources.expiration", {}, user=user)
1520		if not expiration.get(self.parameters.get("datasource")):
1521			return False
1522
1523		# is there a timeout for this data source?
1524		if expiration.get(self.parameters.get("datasource")).get("timeout"):
1525			return self.timestamp + expiration.get(self.parameters.get("datasource")).get("timeout")
1526
1527		return False

Determine if dataset is set to expire

Similar to is_expired, but checks if the dataset will be deleted in the future, not if it should be deleted right now.

Parameters
  • user: User to use for configuration context. Provide to make sure configuration overrides for this user are taken into account.
Returns

False, or the expiration date as a Unix timestamp.

def is_expired(self, user=None):
1529	def is_expired(self, user=None):
1530		"""
1531		Determine if dataset should be deleted
1532
1533		Datasets can be set to expire, but when they should be deleted depends
1534		on a number of factor. This checks them all.
1535
1536		:param user:  User to use for configuration context. Provide to make
1537		sure configuration overrides for this user are taken into account.
1538		:return bool:
1539		"""
1540		# has someone opted out of deleting this?
1541		if not self.is_expiring():
1542			return False
1543
1544		# is this dataset explicitly marked as expiring after a certain time?
1545		future = time.time() + 3600  # ensure we don't delete datasets with invalid expiration times
1546		if self.parameters.get("expires-after") and convert_to_int(self.parameters["expires-after"], future) < time.time():
1547			return True
1548
1549		# is the data source configured to have its datasets expire?
1550		expiration = config.get("datasources.expiration", {}, user=user)
1551		if not expiration.get(self.parameters.get("datasource")):
1552			return False
1553
1554		# is the dataset older than the set timeout?
1555		if expiration.get(self.parameters.get("datasource")).get("timeout"):
1556			return self.timestamp + expiration[self.parameters.get("datasource")]["timeout"] < time.time()
1557
1558		return False

Determine if dataset should be deleted

Datasets can be set to expire, but when they should be deleted depends on a number of factor. This checks them all.

Parameters
  • user: User to use for configuration context. Provide to make sure configuration overrides for this user are taken into account.
Returns
def is_from_collector(self):
1560	def is_from_collector(self):
1561		"""
1562		Check if this dataset was made by a processor that collects data, i.e.
1563		a search or import worker.
1564
1565		:return bool:
1566		"""
1567		return self.type.endswith("-search") or self.type.endswith("-import")

Check if this dataset was made by a processor that collects data, i.e. a search or import worker.

Returns
def get_extension(self):
1569	def get_extension(self):
1570		"""
1571		Gets the file extention this dataset produces.
1572		Also checks whether the results file exists.
1573		Used for checking processor and dataset compatibility.
1574
1575		:return str extension:  Extension, e.g. `csv`
1576		"""
1577		if self.get_results_path().exists():
1578			return self.get_results_path().suffix[1:]
1579
1580		return False

Gets the file extention this dataset produces. Also checks whether the results file exists. Used for checking processor and dataset compatibility.

Returns

Extension, e.g. csv

def get_media_type(self):
1582	def get_media_type(self):
1583		"""
1584		Gets the media type of the dataset file.
1585
1586		:return str: media type, e.g., "text"
1587		"""
1588		own_processor = self.get_own_processor()
1589		if hasattr(self, "media_type"):
1590			# media type can be defined explicitly in the dataset; this is the priority
1591			return self.media_type
1592		elif own_processor is not None:
1593			# or media type can be defined in the processor
1594			# some processors can set different media types for different datasets (e.g., import_media)
1595			if hasattr(own_processor, "media_type"):
1596				return own_processor.media_type
1597
1598		# Default to text
1599		return self.parameters.get("media_type", "text")

Gets the media type of the dataset file.

Returns

media type, e.g., "text"

def get_metadata(self):
1601	def get_metadata(self):
1602		"""
1603		Get dataset metadata
1604
1605		This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended
1606		as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to
1607		update its database (and ensure compatibility with the exporting version of 4CAT).
1608		"""
1609		metadata = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (self.key,))
1610
1611		# get 4CAT version (presumably to ensure export is compatible with import)
1612		metadata["current_4CAT_version"] = get_software_version()
1613		return metadata

Get dataset metadata

This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to update its database (and ensure compatibility with the exporting version of 4CAT).

def get_result_url(self):
1615	def get_result_url(self):
1616		"""
1617		Gets the 4CAT frontend URL of a dataset file.
1618
1619		Uses the FlaskConfig attributes (i.e., SERVER_NAME and
1620		SERVER_HTTPS) plus hardcoded '/result/'.
1621		TODO: create more dynamic method of obtaining url.
1622		"""
1623		filename = self.get_results_path().name
1624		url_to_file = ('https://' if config.get("flask.https") else 'http://') + \
1625						config.get("flask.server_name") + '/result/' + filename
1626		return url_to_file

Gets the 4CAT frontend URL of a dataset file.

Uses the FlaskConfig attributes (i.e., SERVER_NAME and SERVER_HTTPS) plus hardcoded '/result/'. TODO: create more dynamic method of obtaining url.

def warn_unmappable_item( self, item_count, processor=None, error_message=None, warn_admins=True):
1628	def warn_unmappable_item(self, item_count, processor=None, error_message=None, warn_admins=True):
1629		"""
1630		Log an item that is unable to be mapped and warn administrators.
1631
1632		:param int item_count:			Item index
1633		:param Processor processor:		Processor calling function8
1634		"""
1635		dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}"
1636
1637		# Use processing dataset if available, otherwise use original dataset (which likely already has this error message)
1638		closest_dataset = processor.dataset if processor is not None and processor.dataset is not None else self
1639		# Log error to dataset log
1640		closest_dataset.log(dataset_error_message)
1641
1642		if warn_admins:
1643			if processor is not None:
1644				processor.log.warning(f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}.")
1645			elif hasattr(self.db, "log"):
1646				# borrow the database's log handler
1647				self.db.log.warning(f"Unable to map item all items for dataset {closest_dataset.key}.")
1648			else:
1649				# No other log available
1650				raise DataSetException(f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn")

Log an item that is unable to be mapped and warn administrators.

Parameters
  • int item_count: Item index
  • Processor processor: Processor calling function8