Edit on GitHub

backend.lib.preset

Queue a series of processors at once via a preset

  1"""
  2Queue a series of processors at once via a preset
  3"""
  4import abc
  5from backend.lib.processor import BasicProcessor
  6
  7from common.lib.dataset import DataSet
  8
  9
 10class ProcessorPreset(BasicProcessor):
 11	"""
 12	Processor preset
 13	"""	
 14	def process(self):
 15		"""
 16		ALL PRESETS MUST PREPEND 'preset-' TO THEIR TYPE.
 17
 18		This queues a series of post-processors to run in sequence, with an
 19		overarching dataset to which the results of the last processor in the
 20		sequence are copied. The processor pipeline is then attached to the
 21		overarching dataset so it is clear that all processors were run as part
 22		of that particular preset.
 23		"""
 24		pipeline = self.get_processor_pipeline()
 25
 26		pipeline = self.format_linear_pipeline(pipeline)
 27
 28		analysis_pipeline = DataSet(
 29			parameters=pipeline[0]["parameters"],
 30			db=self.db,
 31			type=pipeline[0]["type"],
 32			owner=self.dataset.creator,
 33			is_private=self.dataset.is_private,
 34			parent=self.dataset.key, 
 35			modules=self.modules)
 36		
 37		# give same ownership as parent dataset
 38		analysis_pipeline.copy_ownership_from(self.dataset)
 39
 40		# this starts the pipeline
 41		self.queue.add_job(pipeline[0]["type"], remote_id=analysis_pipeline.key)
 42
 43	def after_process(self):
 44		"""
 45		Run after processing
 46
 47		In this case, this is run immediately after the underlying analyses
 48		have been queued. This overrides the default behaviour which finishes
 49		the DataSet after processing; in this case, it is left 'open' until it
 50		is finished by the last underlying analysis.
 51		"""
 52		self.dataset.update_status("Awaiting completion of underlying analyses...")
 53		self.job.finish()
 54
 55	@abc.abstractmethod
 56	def get_processor_pipeline(self):
 57		"""
 58		Preset pipeline definition
 59
 60		Should return a list of dictionaries, each dictionary having a `type`
 61		key with the processor type ID and a `parameters` key with the
 62		processor parameters. The order of the list is the order in which the
 63		processors are run. Compatibility of processors in the list is not
 64		checked.
 65
 66		:return list: Processor pipeline definition
 67		"""
 68		pass
 69
 70	def format_linear_pipeline(self, pipeline):
 71		"""
 72		Format a linear pipeline to a nested processor parameter set
 73
 74		:param list pipeline:  Linear pipeline
 75		:return list:  Nested pipeline
 76		"""
 77		if not pipeline:
 78			raise ValueError("Pipeline is empty")
 79		
 80		# make sure the last item in the pipeline copies to the preset's dataset
 81		# also make sure there is always a "parameters" key
 82		pipeline = [{"parameters": {}, **p} for p in pipeline.copy()]
 83
 84		pipeline[-1]["parameters"]["attach_to"] = self.dataset.key
 85
 86		# map the linear pipeline to a nested processor parameter set
 87		while len(pipeline) > 1:
 88			last = pipeline.pop()
 89			pipeline[-1]["parameters"]["next"] = [last]
 90	
 91		return pipeline
 92	
 93class ProcessorAdvancedPreset(ProcessorPreset):
 94	"""
 95	Similar to ProcessorPreset, but allows for more advanced processor trees with multiple 
 96	branches and nested processors.
 97	"""
 98	def format_linear_pipeline(self, pipeline):
 99		"""
100		No formatting of pipeline is needed for advanced presets
101		:param list pipeline:  Linear pipeline
102		:return list:  Nested pipeline
103		"""
104		return pipeline
105	
106	def get_processor_pipeline(self):
107		"""
108		Preset pipeline definition
109		Should return a list of dictionaries, each dictionary having a `type`
110		key with the processor type ID and a `parameters` key with the
111		processor parameters. The order of the list is the order in which the
112		processors are run. Compatibility of processors in the list is not
113		"""
114		advanced_pipeline = self.get_processor_advanced_pipeline(attach_to=self.dataset.key)
115		if not advanced_pipeline:
116			raise ValueError("Pipeline is empty")
117		
118		# Ensure one of the processors in the advanced pipeline has the attach_to parameter
119		all_processors = []
120		def collect_processors(processor):
121			if "next" in processor["parameters"]:
122				for sub_processor in processor["parameters"]["next"]:
123					collect_processors(sub_processor)
124			all_processors.append(processor)
125		for processor in advanced_pipeline:
126			collect_processors(processor)
127		
128		if not any("attach_to" in processor["parameters"] for processor in all_processors):
129			raise ValueError("No processor in the advanced pipeline has the attach_to parameter")
130
131		return advanced_pipeline
132	
133	@abc.abstractmethod
134	def get_processor_advanced_pipeline(self, attach_to):
135		"""
136		Advanced preset pipeline definition
137
138		Similar to base class `get_processor_pipeline`, but allows for more advanced
139		processing. This allows multiple processors to be queued in parallel
140		or in a nested structure. (i.e., "next" contains a list of processors
141		to run in sequence after the each processor.)
142		Format a linear pipeline to a nested processor parameter set
143
144		"attach_to" must be added as a parameter to one of the processors. Failure
145		to do so will cause the preset to never finish.
146		:param list pipeline:  Linear pipeline
147		:return list:  Nested pipeline
148		"""
149		pass
class ProcessorPreset(backend.lib.processor.BasicProcessor):
11class ProcessorPreset(BasicProcessor):
12	"""
13	Processor preset
14	"""	
15	def process(self):
16		"""
17		ALL PRESETS MUST PREPEND 'preset-' TO THEIR TYPE.
18
19		This queues a series of post-processors to run in sequence, with an
20		overarching dataset to which the results of the last processor in the
21		sequence are copied. The processor pipeline is then attached to the
22		overarching dataset so it is clear that all processors were run as part
23		of that particular preset.
24		"""
25		pipeline = self.get_processor_pipeline()
26
27		pipeline = self.format_linear_pipeline(pipeline)
28
29		analysis_pipeline = DataSet(
30			parameters=pipeline[0]["parameters"],
31			db=self.db,
32			type=pipeline[0]["type"],
33			owner=self.dataset.creator,
34			is_private=self.dataset.is_private,
35			parent=self.dataset.key, 
36			modules=self.modules)
37		
38		# give same ownership as parent dataset
39		analysis_pipeline.copy_ownership_from(self.dataset)
40
41		# this starts the pipeline
42		self.queue.add_job(pipeline[0]["type"], remote_id=analysis_pipeline.key)
43
44	def after_process(self):
45		"""
46		Run after processing
47
48		In this case, this is run immediately after the underlying analyses
49		have been queued. This overrides the default behaviour which finishes
50		the DataSet after processing; in this case, it is left 'open' until it
51		is finished by the last underlying analysis.
52		"""
53		self.dataset.update_status("Awaiting completion of underlying analyses...")
54		self.job.finish()
55
56	@abc.abstractmethod
57	def get_processor_pipeline(self):
58		"""
59		Preset pipeline definition
60
61		Should return a list of dictionaries, each dictionary having a `type`
62		key with the processor type ID and a `parameters` key with the
63		processor parameters. The order of the list is the order in which the
64		processors are run. Compatibility of processors in the list is not
65		checked.
66
67		:return list: Processor pipeline definition
68		"""
69		pass
70
71	def format_linear_pipeline(self, pipeline):
72		"""
73		Format a linear pipeline to a nested processor parameter set
74
75		:param list pipeline:  Linear pipeline
76		:return list:  Nested pipeline
77		"""
78		if not pipeline:
79			raise ValueError("Pipeline is empty")
80		
81		# make sure the last item in the pipeline copies to the preset's dataset
82		# also make sure there is always a "parameters" key
83		pipeline = [{"parameters": {}, **p} for p in pipeline.copy()]
84
85		pipeline[-1]["parameters"]["attach_to"] = self.dataset.key
86
87		# map the linear pipeline to a nested processor parameter set
88		while len(pipeline) > 1:
89			last = pipeline.pop()
90			pipeline[-1]["parameters"]["next"] = [last]
91	
92		return pipeline

Processor preset

def process(self):
15	def process(self):
16		"""
17		ALL PRESETS MUST PREPEND 'preset-' TO THEIR TYPE.
18
19		This queues a series of post-processors to run in sequence, with an
20		overarching dataset to which the results of the last processor in the
21		sequence are copied. The processor pipeline is then attached to the
22		overarching dataset so it is clear that all processors were run as part
23		of that particular preset.
24		"""
25		pipeline = self.get_processor_pipeline()
26
27		pipeline = self.format_linear_pipeline(pipeline)
28
29		analysis_pipeline = DataSet(
30			parameters=pipeline[0]["parameters"],
31			db=self.db,
32			type=pipeline[0]["type"],
33			owner=self.dataset.creator,
34			is_private=self.dataset.is_private,
35			parent=self.dataset.key, 
36			modules=self.modules)
37		
38		# give same ownership as parent dataset
39		analysis_pipeline.copy_ownership_from(self.dataset)
40
41		# this starts the pipeline
42		self.queue.add_job(pipeline[0]["type"], remote_id=analysis_pipeline.key)

ALL PRESETS MUST PREPEND 'preset-' TO THEIR TYPE.

This queues a series of post-processors to run in sequence, with an overarching dataset to which the results of the last processor in the sequence are copied. The processor pipeline is then attached to the overarching dataset so it is clear that all processors were run as part of that particular preset.

def after_process(self):
44	def after_process(self):
45		"""
46		Run after processing
47
48		In this case, this is run immediately after the underlying analyses
49		have been queued. This overrides the default behaviour which finishes
50		the DataSet after processing; in this case, it is left 'open' until it
51		is finished by the last underlying analysis.
52		"""
53		self.dataset.update_status("Awaiting completion of underlying analyses...")
54		self.job.finish()

Run after processing

In this case, this is run immediately after the underlying analyses have been queued. This overrides the default behaviour which finishes the DataSet after processing; in this case, it is left 'open' until it is finished by the last underlying analysis.

@abc.abstractmethod
def get_processor_pipeline(self):
56	@abc.abstractmethod
57	def get_processor_pipeline(self):
58		"""
59		Preset pipeline definition
60
61		Should return a list of dictionaries, each dictionary having a `type`
62		key with the processor type ID and a `parameters` key with the
63		processor parameters. The order of the list is the order in which the
64		processors are run. Compatibility of processors in the list is not
65		checked.
66
67		:return list: Processor pipeline definition
68		"""
69		pass

Preset pipeline definition

Should return a list of dictionaries, each dictionary having a type key with the processor type ID and a parameters key with the processor parameters. The order of the list is the order in which the processors are run. Compatibility of processors in the list is not checked.

Returns

Processor pipeline definition

def format_linear_pipeline(self, pipeline):
71	def format_linear_pipeline(self, pipeline):
72		"""
73		Format a linear pipeline to a nested processor parameter set
74
75		:param list pipeline:  Linear pipeline
76		:return list:  Nested pipeline
77		"""
78		if not pipeline:
79			raise ValueError("Pipeline is empty")
80		
81		# make sure the last item in the pipeline copies to the preset's dataset
82		# also make sure there is always a "parameters" key
83		pipeline = [{"parameters": {}, **p} for p in pipeline.copy()]
84
85		pipeline[-1]["parameters"]["attach_to"] = self.dataset.key
86
87		# map the linear pipeline to a nested processor parameter set
88		while len(pipeline) > 1:
89			last = pipeline.pop()
90			pipeline[-1]["parameters"]["next"] = [last]
91	
92		return pipeline

Format a linear pipeline to a nested processor parameter set

Parameters
  • list pipeline: Linear pipeline
Returns

Nested pipeline

class ProcessorAdvancedPreset(ProcessorPreset):
 94class ProcessorAdvancedPreset(ProcessorPreset):
 95	"""
 96	Similar to ProcessorPreset, but allows for more advanced processor trees with multiple 
 97	branches and nested processors.
 98	"""
 99	def format_linear_pipeline(self, pipeline):
100		"""
101		No formatting of pipeline is needed for advanced presets
102		:param list pipeline:  Linear pipeline
103		:return list:  Nested pipeline
104		"""
105		return pipeline
106	
107	def get_processor_pipeline(self):
108		"""
109		Preset pipeline definition
110		Should return a list of dictionaries, each dictionary having a `type`
111		key with the processor type ID and a `parameters` key with the
112		processor parameters. The order of the list is the order in which the
113		processors are run. Compatibility of processors in the list is not
114		"""
115		advanced_pipeline = self.get_processor_advanced_pipeline(attach_to=self.dataset.key)
116		if not advanced_pipeline:
117			raise ValueError("Pipeline is empty")
118		
119		# Ensure one of the processors in the advanced pipeline has the attach_to parameter
120		all_processors = []
121		def collect_processors(processor):
122			if "next" in processor["parameters"]:
123				for sub_processor in processor["parameters"]["next"]:
124					collect_processors(sub_processor)
125			all_processors.append(processor)
126		for processor in advanced_pipeline:
127			collect_processors(processor)
128		
129		if not any("attach_to" in processor["parameters"] for processor in all_processors):
130			raise ValueError("No processor in the advanced pipeline has the attach_to parameter")
131
132		return advanced_pipeline
133	
134	@abc.abstractmethod
135	def get_processor_advanced_pipeline(self, attach_to):
136		"""
137		Advanced preset pipeline definition
138
139		Similar to base class `get_processor_pipeline`, but allows for more advanced
140		processing. This allows multiple processors to be queued in parallel
141		or in a nested structure. (i.e., "next" contains a list of processors
142		to run in sequence after the each processor.)
143		Format a linear pipeline to a nested processor parameter set
144
145		"attach_to" must be added as a parameter to one of the processors. Failure
146		to do so will cause the preset to never finish.
147		:param list pipeline:  Linear pipeline
148		:return list:  Nested pipeline
149		"""
150		pass

Similar to ProcessorPreset, but allows for more advanced processor trees with multiple branches and nested processors.

def format_linear_pipeline(self, pipeline):
 99	def format_linear_pipeline(self, pipeline):
100		"""
101		No formatting of pipeline is needed for advanced presets
102		:param list pipeline:  Linear pipeline
103		:return list:  Nested pipeline
104		"""
105		return pipeline

No formatting of pipeline is needed for advanced presets

Parameters
  • list pipeline: Linear pipeline
Returns

Nested pipeline

def get_processor_pipeline(self):
107	def get_processor_pipeline(self):
108		"""
109		Preset pipeline definition
110		Should return a list of dictionaries, each dictionary having a `type`
111		key with the processor type ID and a `parameters` key with the
112		processor parameters. The order of the list is the order in which the
113		processors are run. Compatibility of processors in the list is not
114		"""
115		advanced_pipeline = self.get_processor_advanced_pipeline(attach_to=self.dataset.key)
116		if not advanced_pipeline:
117			raise ValueError("Pipeline is empty")
118		
119		# Ensure one of the processors in the advanced pipeline has the attach_to parameter
120		all_processors = []
121		def collect_processors(processor):
122			if "next" in processor["parameters"]:
123				for sub_processor in processor["parameters"]["next"]:
124					collect_processors(sub_processor)
125			all_processors.append(processor)
126		for processor in advanced_pipeline:
127			collect_processors(processor)
128		
129		if not any("attach_to" in processor["parameters"] for processor in all_processors):
130			raise ValueError("No processor in the advanced pipeline has the attach_to parameter")
131
132		return advanced_pipeline

Preset pipeline definition Should return a list of dictionaries, each dictionary having a type key with the processor type ID and a parameters key with the processor parameters. The order of the list is the order in which the processors are run. Compatibility of processors in the list is not

@abc.abstractmethod
def get_processor_advanced_pipeline(self, attach_to):
134	@abc.abstractmethod
135	def get_processor_advanced_pipeline(self, attach_to):
136		"""
137		Advanced preset pipeline definition
138
139		Similar to base class `get_processor_pipeline`, but allows for more advanced
140		processing. This allows multiple processors to be queued in parallel
141		or in a nested structure. (i.e., "next" contains a list of processors
142		to run in sequence after the each processor.)
143		Format a linear pipeline to a nested processor parameter set
144
145		"attach_to" must be added as a parameter to one of the processors. Failure
146		to do so will cause the preset to never finish.
147		:param list pipeline:  Linear pipeline
148		:return list:  Nested pipeline
149		"""
150		pass

Advanced preset pipeline definition

Similar to base class get_processor_pipeline, but allows for more advanced processing. This allows multiple processors to be queued in parallel or in a nested structure. (i.e., "next" contains a list of processors to run in sequence after the each processor.) Format a linear pipeline to a nested processor parameter set

"attach_to" must be added as a parameter to one of the processors. Failure to do so will cause the preset to never finish.

Parameters
  • list pipeline: Linear pipeline
Returns

Nested pipeline