backend.lib.preset
Queue a series of processors at once via a preset
1""" 2Queue a series of processors at once via a preset 3""" 4import abc 5from backend.lib.processor import BasicProcessor 6 7from common.lib.dataset import DataSet 8 9 10class ProcessorPreset(BasicProcessor): 11 """ 12 Processor preset 13 """ 14 def process(self): 15 """ 16 ALL PRESETS MUST PREPEND 'preset-' TO THEIR TYPE. 17 18 This queues a series of post-processors to run in sequence, with an 19 overarching dataset to which the results of the last processor in the 20 sequence are copied. The processor pipeline is then attached to the 21 overarching dataset so it is clear that all processors were run as part 22 of that particular preset. 23 """ 24 pipeline = self.get_processor_pipeline() 25 26 pipeline = self.format_linear_pipeline(pipeline) 27 28 analysis_pipeline = DataSet( 29 parameters=pipeline[0]["parameters"], 30 db=self.db, 31 type=pipeline[0]["type"], 32 owner=self.dataset.creator, 33 is_private=self.dataset.is_private, 34 parent=self.dataset.key, 35 modules=self.modules) 36 37 # give same ownership as parent dataset 38 analysis_pipeline.copy_ownership_from(self.dataset) 39 40 # this starts the pipeline 41 self.queue.add_job(pipeline[0]["type"], remote_id=analysis_pipeline.key) 42 43 def after_process(self): 44 """ 45 Run after processing 46 47 In this case, this is run immediately after the underlying analyses 48 have been queued. This overrides the default behaviour which finishes 49 the DataSet after processing; in this case, it is left 'open' until it 50 is finished by the last underlying analysis. 51 """ 52 self.dataset.update_status("Awaiting completion of underlying analyses...") 53 self.job.finish() 54 55 @abc.abstractmethod 56 def get_processor_pipeline(self): 57 """ 58 Preset pipeline definition 59 60 Should return a list of dictionaries, each dictionary having a `type` 61 key with the processor type ID and a `parameters` key with the 62 processor parameters. The order of the list is the order in which the 63 processors are run. Compatibility of processors in the list is not 64 checked. 65 66 :return list: Processor pipeline definition 67 """ 68 pass 69 70 def format_linear_pipeline(self, pipeline): 71 """ 72 Format a linear pipeline to a nested processor parameter set 73 74 :param list pipeline: Linear pipeline 75 :return list: Nested pipeline 76 """ 77 if not pipeline: 78 raise ValueError("Pipeline is empty") 79 80 # make sure the last item in the pipeline copies to the preset's dataset 81 # also make sure there is always a "parameters" key 82 pipeline = [{"parameters": {}, **p} for p in pipeline.copy()] 83 84 pipeline[-1]["parameters"]["attach_to"] = self.dataset.key 85 86 # map the linear pipeline to a nested processor parameter set 87 while len(pipeline) > 1: 88 last = pipeline.pop() 89 pipeline[-1]["parameters"]["next"] = [last] 90 91 return pipeline 92 93class ProcessorAdvancedPreset(ProcessorPreset): 94 """ 95 Similar to ProcessorPreset, but allows for more advanced processor trees with multiple 96 branches and nested processors. 97 """ 98 def format_linear_pipeline(self, pipeline): 99 """ 100 No formatting of pipeline is needed for advanced presets 101 :param list pipeline: Linear pipeline 102 :return list: Nested pipeline 103 """ 104 return pipeline 105 106 def get_processor_pipeline(self): 107 """ 108 Preset pipeline definition 109 Should return a list of dictionaries, each dictionary having a `type` 110 key with the processor type ID and a `parameters` key with the 111 processor parameters. The order of the list is the order in which the 112 processors are run. Compatibility of processors in the list is not 113 """ 114 advanced_pipeline = self.get_processor_advanced_pipeline(attach_to=self.dataset.key) 115 if not advanced_pipeline: 116 raise ValueError("Pipeline is empty") 117 118 # Ensure one of the processors in the advanced pipeline has the attach_to parameter 119 all_processors = [] 120 def collect_processors(processor): 121 if "next" in processor["parameters"]: 122 for sub_processor in processor["parameters"]["next"]: 123 collect_processors(sub_processor) 124 all_processors.append(processor) 125 for processor in advanced_pipeline: 126 collect_processors(processor) 127 128 if not any("attach_to" in processor["parameters"] for processor in all_processors): 129 raise ValueError("No processor in the advanced pipeline has the attach_to parameter") 130 131 return advanced_pipeline 132 133 @abc.abstractmethod 134 def get_processor_advanced_pipeline(self, attach_to): 135 """ 136 Advanced preset pipeline definition 137 138 Similar to base class `get_processor_pipeline`, but allows for more advanced 139 processing. This allows multiple processors to be queued in parallel 140 or in a nested structure. (i.e., "next" contains a list of processors 141 to run in sequence after the each processor.) 142 Format a linear pipeline to a nested processor parameter set 143 144 "attach_to" must be added as a parameter to one of the processors. Failure 145 to do so will cause the preset to never finish. 146 :param list pipeline: Linear pipeline 147 :return list: Nested pipeline 148 """ 149 pass
11class ProcessorPreset(BasicProcessor): 12 """ 13 Processor preset 14 """ 15 def process(self): 16 """ 17 ALL PRESETS MUST PREPEND 'preset-' TO THEIR TYPE. 18 19 This queues a series of post-processors to run in sequence, with an 20 overarching dataset to which the results of the last processor in the 21 sequence are copied. The processor pipeline is then attached to the 22 overarching dataset so it is clear that all processors were run as part 23 of that particular preset. 24 """ 25 pipeline = self.get_processor_pipeline() 26 27 pipeline = self.format_linear_pipeline(pipeline) 28 29 analysis_pipeline = DataSet( 30 parameters=pipeline[0]["parameters"], 31 db=self.db, 32 type=pipeline[0]["type"], 33 owner=self.dataset.creator, 34 is_private=self.dataset.is_private, 35 parent=self.dataset.key, 36 modules=self.modules) 37 38 # give same ownership as parent dataset 39 analysis_pipeline.copy_ownership_from(self.dataset) 40 41 # this starts the pipeline 42 self.queue.add_job(pipeline[0]["type"], remote_id=analysis_pipeline.key) 43 44 def after_process(self): 45 """ 46 Run after processing 47 48 In this case, this is run immediately after the underlying analyses 49 have been queued. This overrides the default behaviour which finishes 50 the DataSet after processing; in this case, it is left 'open' until it 51 is finished by the last underlying analysis. 52 """ 53 self.dataset.update_status("Awaiting completion of underlying analyses...") 54 self.job.finish() 55 56 @abc.abstractmethod 57 def get_processor_pipeline(self): 58 """ 59 Preset pipeline definition 60 61 Should return a list of dictionaries, each dictionary having a `type` 62 key with the processor type ID and a `parameters` key with the 63 processor parameters. The order of the list is the order in which the 64 processors are run. Compatibility of processors in the list is not 65 checked. 66 67 :return list: Processor pipeline definition 68 """ 69 pass 70 71 def format_linear_pipeline(self, pipeline): 72 """ 73 Format a linear pipeline to a nested processor parameter set 74 75 :param list pipeline: Linear pipeline 76 :return list: Nested pipeline 77 """ 78 if not pipeline: 79 raise ValueError("Pipeline is empty") 80 81 # make sure the last item in the pipeline copies to the preset's dataset 82 # also make sure there is always a "parameters" key 83 pipeline = [{"parameters": {}, **p} for p in pipeline.copy()] 84 85 pipeline[-1]["parameters"]["attach_to"] = self.dataset.key 86 87 # map the linear pipeline to a nested processor parameter set 88 while len(pipeline) > 1: 89 last = pipeline.pop() 90 pipeline[-1]["parameters"]["next"] = [last] 91 92 return pipeline
Processor preset
15 def process(self): 16 """ 17 ALL PRESETS MUST PREPEND 'preset-' TO THEIR TYPE. 18 19 This queues a series of post-processors to run in sequence, with an 20 overarching dataset to which the results of the last processor in the 21 sequence are copied. The processor pipeline is then attached to the 22 overarching dataset so it is clear that all processors were run as part 23 of that particular preset. 24 """ 25 pipeline = self.get_processor_pipeline() 26 27 pipeline = self.format_linear_pipeline(pipeline) 28 29 analysis_pipeline = DataSet( 30 parameters=pipeline[0]["parameters"], 31 db=self.db, 32 type=pipeline[0]["type"], 33 owner=self.dataset.creator, 34 is_private=self.dataset.is_private, 35 parent=self.dataset.key, 36 modules=self.modules) 37 38 # give same ownership as parent dataset 39 analysis_pipeline.copy_ownership_from(self.dataset) 40 41 # this starts the pipeline 42 self.queue.add_job(pipeline[0]["type"], remote_id=analysis_pipeline.key)
ALL PRESETS MUST PREPEND 'preset-' TO THEIR TYPE.
This queues a series of post-processors to run in sequence, with an overarching dataset to which the results of the last processor in the sequence are copied. The processor pipeline is then attached to the overarching dataset so it is clear that all processors were run as part of that particular preset.
44 def after_process(self): 45 """ 46 Run after processing 47 48 In this case, this is run immediately after the underlying analyses 49 have been queued. This overrides the default behaviour which finishes 50 the DataSet after processing; in this case, it is left 'open' until it 51 is finished by the last underlying analysis. 52 """ 53 self.dataset.update_status("Awaiting completion of underlying analyses...") 54 self.job.finish()
Run after processing
In this case, this is run immediately after the underlying analyses have been queued. This overrides the default behaviour which finishes the DataSet after processing; in this case, it is left 'open' until it is finished by the last underlying analysis.
56 @abc.abstractmethod 57 def get_processor_pipeline(self): 58 """ 59 Preset pipeline definition 60 61 Should return a list of dictionaries, each dictionary having a `type` 62 key with the processor type ID and a `parameters` key with the 63 processor parameters. The order of the list is the order in which the 64 processors are run. Compatibility of processors in the list is not 65 checked. 66 67 :return list: Processor pipeline definition 68 """ 69 pass
Preset pipeline definition
Should return a list of dictionaries, each dictionary having a type
key with the processor type ID and a parameters
key with the
processor parameters. The order of the list is the order in which the
processors are run. Compatibility of processors in the list is not
checked.
Returns
Processor pipeline definition
71 def format_linear_pipeline(self, pipeline): 72 """ 73 Format a linear pipeline to a nested processor parameter set 74 75 :param list pipeline: Linear pipeline 76 :return list: Nested pipeline 77 """ 78 if not pipeline: 79 raise ValueError("Pipeline is empty") 80 81 # make sure the last item in the pipeline copies to the preset's dataset 82 # also make sure there is always a "parameters" key 83 pipeline = [{"parameters": {}, **p} for p in pipeline.copy()] 84 85 pipeline[-1]["parameters"]["attach_to"] = self.dataset.key 86 87 # map the linear pipeline to a nested processor parameter set 88 while len(pipeline) > 1: 89 last = pipeline.pop() 90 pipeline[-1]["parameters"]["next"] = [last] 91 92 return pipeline
Format a linear pipeline to a nested processor parameter set
Parameters
- list pipeline: Linear pipeline
Returns
Nested pipeline
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- type
- max_workers
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- description
- category
- extension
- config
- is_running_in_preset
- filepath
- work
- remove_files
- abort
- iterate_proxied_requests
- push_proxied_request
- flush_proxied_requests
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- save_annotations
- map_item_method_available
- get_mapped_item
- is_filter
- get_options
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor
94class ProcessorAdvancedPreset(ProcessorPreset): 95 """ 96 Similar to ProcessorPreset, but allows for more advanced processor trees with multiple 97 branches and nested processors. 98 """ 99 def format_linear_pipeline(self, pipeline): 100 """ 101 No formatting of pipeline is needed for advanced presets 102 :param list pipeline: Linear pipeline 103 :return list: Nested pipeline 104 """ 105 return pipeline 106 107 def get_processor_pipeline(self): 108 """ 109 Preset pipeline definition 110 Should return a list of dictionaries, each dictionary having a `type` 111 key with the processor type ID and a `parameters` key with the 112 processor parameters. The order of the list is the order in which the 113 processors are run. Compatibility of processors in the list is not 114 """ 115 advanced_pipeline = self.get_processor_advanced_pipeline(attach_to=self.dataset.key) 116 if not advanced_pipeline: 117 raise ValueError("Pipeline is empty") 118 119 # Ensure one of the processors in the advanced pipeline has the attach_to parameter 120 all_processors = [] 121 def collect_processors(processor): 122 if "next" in processor["parameters"]: 123 for sub_processor in processor["parameters"]["next"]: 124 collect_processors(sub_processor) 125 all_processors.append(processor) 126 for processor in advanced_pipeline: 127 collect_processors(processor) 128 129 if not any("attach_to" in processor["parameters"] for processor in all_processors): 130 raise ValueError("No processor in the advanced pipeline has the attach_to parameter") 131 132 return advanced_pipeline 133 134 @abc.abstractmethod 135 def get_processor_advanced_pipeline(self, attach_to): 136 """ 137 Advanced preset pipeline definition 138 139 Similar to base class `get_processor_pipeline`, but allows for more advanced 140 processing. This allows multiple processors to be queued in parallel 141 or in a nested structure. (i.e., "next" contains a list of processors 142 to run in sequence after the each processor.) 143 Format a linear pipeline to a nested processor parameter set 144 145 "attach_to" must be added as a parameter to one of the processors. Failure 146 to do so will cause the preset to never finish. 147 :param list pipeline: Linear pipeline 148 :return list: Nested pipeline 149 """ 150 pass
Similar to ProcessorPreset, but allows for more advanced processor trees with multiple branches and nested processors.
99 def format_linear_pipeline(self, pipeline): 100 """ 101 No formatting of pipeline is needed for advanced presets 102 :param list pipeline: Linear pipeline 103 :return list: Nested pipeline 104 """ 105 return pipeline
No formatting of pipeline is needed for advanced presets
Parameters
- list pipeline: Linear pipeline
Returns
Nested pipeline
107 def get_processor_pipeline(self): 108 """ 109 Preset pipeline definition 110 Should return a list of dictionaries, each dictionary having a `type` 111 key with the processor type ID and a `parameters` key with the 112 processor parameters. The order of the list is the order in which the 113 processors are run. Compatibility of processors in the list is not 114 """ 115 advanced_pipeline = self.get_processor_advanced_pipeline(attach_to=self.dataset.key) 116 if not advanced_pipeline: 117 raise ValueError("Pipeline is empty") 118 119 # Ensure one of the processors in the advanced pipeline has the attach_to parameter 120 all_processors = [] 121 def collect_processors(processor): 122 if "next" in processor["parameters"]: 123 for sub_processor in processor["parameters"]["next"]: 124 collect_processors(sub_processor) 125 all_processors.append(processor) 126 for processor in advanced_pipeline: 127 collect_processors(processor) 128 129 if not any("attach_to" in processor["parameters"] for processor in all_processors): 130 raise ValueError("No processor in the advanced pipeline has the attach_to parameter") 131 132 return advanced_pipeline
Preset pipeline definition
Should return a list of dictionaries, each dictionary having a type
key with the processor type ID and a parameters
key with the
processor parameters. The order of the list is the order in which the
processors are run. Compatibility of processors in the list is not
134 @abc.abstractmethod 135 def get_processor_advanced_pipeline(self, attach_to): 136 """ 137 Advanced preset pipeline definition 138 139 Similar to base class `get_processor_pipeline`, but allows for more advanced 140 processing. This allows multiple processors to be queued in parallel 141 or in a nested structure. (i.e., "next" contains a list of processors 142 to run in sequence after the each processor.) 143 Format a linear pipeline to a nested processor parameter set 144 145 "attach_to" must be added as a parameter to one of the processors. Failure 146 to do so will cause the preset to never finish. 147 :param list pipeline: Linear pipeline 148 :return list: Nested pipeline 149 """ 150 pass
Advanced preset pipeline definition
Similar to base class get_processor_pipeline
, but allows for more advanced
processing. This allows multiple processors to be queued in parallel
or in a nested structure. (i.e., "next" contains a list of processors
to run in sequence after the each processor.)
Format a linear pipeline to a nested processor parameter set
"attach_to" must be added as a parameter to one of the processors. Failure to do so will cause the preset to never finish.
Parameters
- list pipeline: Linear pipeline
Returns
Nested pipeline
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- type
- max_workers
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- description
- category
- extension
- config
- is_running_in_preset
- filepath
- work
- remove_files
- abort
- iterate_proxied_requests
- push_proxied_request
- flush_proxied_requests
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- save_annotations
- map_item_method_available
- get_mapped_item
- is_filter
- get_options
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor