datasources.upload.import_csv
Custom data upload to create bespoke datasets
1""" 2Custom data upload to create bespoke datasets 3""" 4import secrets 5import hashlib 6import time 7import csv 8import re 9import io 10 11import datasources.upload.import_formats as import_formats 12 13from dateutil.parser import parse as parse_datetime 14from datetime import datetime 15 16from backend.lib.processor import BasicProcessor 17from common.lib.exceptions import QueryParametersException, QueryNeedsFurtherInputException, \ 18 QueryNeedsExplicitConfirmationException, CsvDialectException 19from common.lib.helpers import strip_tags, sniff_encoding, UserInput, HashCache 20 21 22class SearchCustom(BasicProcessor): 23 type = "upload-search" # job ID 24 category = "Search" # category 25 title = "Custom Dataset Upload" # title displayed in UI 26 description = "Upload your own CSV file to be used as a dataset" # description displayed in UI 27 extension = "csv" # extension of result file, used internally and in UI 28 is_local = False # Whether this datasource is locally scraped 29 is_static = False # Whether this datasource is still updated 30 31 max_workers = 1 32 33 @classmethod 34 def get_options(cls, parent_dataset=None, config=None) -> dict: 35 """ 36 Get processor options 37 38 :param parent_dataset DataSet: An object representing the dataset that 39 the processor would be or was run on. Can be used, in conjunction with 40 config, to show some options only to privileged users. 41 :param config ConfigManager|None config: Configuration reader (context-aware) 42 :return dict: Options for this processor 43 """ 44 return { 45 "intro": { 46 "type": UserInput.OPTION_INFO, 47 "help": "You can upload a CSV or TAB file here that, after upload, will be available for further analysis " 48 "and processing. Files need to be [UTF-8](https://en.wikipedia.org/wiki/UTF-8)-encoded and must " 49 "contain a header row.\n\n" 50 "You can indicate what format the file has or upload one with arbitrary structure. In the latter " 51 "case, for each item, columns describing its ID, author, timestamp, and content are expected. You " 52 "can select which column holds which value after uploading the file." 53 }, 54 "data_upload": { 55 "type": UserInput.OPTION_FILE, 56 "help": "File" 57 }, 58 "format": { 59 "type": UserInput.OPTION_CHOICE, 60 "help": "CSV format", 61 "options": { 62 tool: info["name"] for tool, info in import_formats.tools.items() 63 }, 64 "default": "custom" 65 }, 66 "strip_html": { 67 "type": UserInput.OPTION_TOGGLE, 68 "help": "Strip HTML?", 69 "default": False, 70 "tooltip": "Removes HTML tags from the column identified as containing the item content ('body' by default)" 71 } 72 } 73 74 def process(self): 75 """ 76 Process uploaded CSV file 77 78 Applies the provided mapping and makes sure the file is in a format 79 4CAT will understand. 80 """ 81 tool_format = import_formats.tools.get(self.parameters.get("format")) 82 temp_file = self.dataset.get_results_path().with_suffix(".importing") 83 with temp_file.open("rb") as infile: 84 # detect encoding - UTF-8 with or without BOM 85 encoding = sniff_encoding(infile) 86 87 # figure out the csv dialect 88 # the sniffer is not perfect and sometimes makes mistakes 89 # for some formats we already know the dialect, so we can override its 90 # guess and set the properties as defined in import_formats.py 91 infile = temp_file.open("r", encoding=encoding) 92 sample = infile.read(1024 * 1024) 93 try: 94 possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))] 95 except csv.Error: 96 possible_dialects = csv.list_dialects() 97 if tool_format.get("csv_dialect", {}): 98 # Known dialects are defined in import_formats.py 99 dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t")) 100 for prop in tool_format.get("csv_dialect", {}): 101 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 102 possible_dialects.append(dialect) 103 104 while possible_dialects: 105 # With validated csvs, save as is but make sure the raw file is sorted 106 infile.seek(0) 107 dialect = possible_dialects.pop() # Use the last dialect first 108 self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) is csv.Dialect else dialect}") 109 reader = csv.DictReader(infile, dialect=dialect) 110 111 if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \ 112 set(tool_format["columns"]) != set(tool_format["columns"]): 113 raise QueryParametersException("Not all columns are present") 114 115 # hasher for pseudonymisation 116 salt = secrets.token_bytes(16) 117 hasher = hashlib.blake2b(digest_size=24, salt=salt) 118 hash_cache = HashCache(hasher) 119 120 # write the resulting dataset 121 writer = None 122 done = 0 123 skipped = 0 124 timestamp_missing = 0 125 with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv: 126 # mapper is defined in import_formats 127 try: 128 for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)): 129 if isinstance(item, import_formats.InvalidImportedItem): 130 # if the mapper returns this class, the item is not written 131 skipped += 1 132 if hasattr(item, "reason"): 133 self.dataset.log(f"Skipping item ({item.reason})") 134 continue 135 136 if not writer: 137 writer = csv.DictWriter(output_csv, fieldnames=list(item.keys())) 138 writer.writeheader() 139 140 if self.parameters.get("strip_html") and "body" in item: 141 item["body"] = strip_tags(item["body"]) 142 143 # check for None/empty timestamp 144 if not item.get("timestamp"): 145 # Notify the user that items are missing a timestamp 146 timestamp_missing += 1 147 self.dataset.log(f"Item {i} ({item.get('id')}) has no timestamp.") 148 149 # pseudonymise or anonymise as needed 150 filtering = self.parameters.get("pseudonymise") 151 try: 152 if filtering: 153 for field, value in item.items(): 154 if field is None: 155 # This would normally be caught when writerow is called 156 raise CsvDialectException("Field is None") 157 if field.startswith("author"): 158 if filtering == "anonymise": 159 item[field] = "REDACTED" 160 elif filtering == "pseudonymise": 161 item[field] = hash_cache.update_cache(value) 162 163 writer.writerow(item) 164 except ValueError as e: 165 if not possible_dialects: 166 self.dataset.log(f"Error ({e}) writing item {i}: {item}") 167 return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct " 168 "format or edited the CSV after exporting? Try importing " 169 "as custom format.") 170 else: 171 raise CsvDialectException(f"Error ({e}) writing item {i}: {item}") 172 173 done += 1 174 175 except import_formats.InvalidCustomFormat as e: 176 self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset " 177 "log for details.") 178 infile.close() 179 temp_file.unlink() 180 return self.dataset.finish_with_error(str(e)) 181 182 except UnicodeDecodeError: 183 infile.close() 184 temp_file.unlink() 185 return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. " 186 "Make sure the file is encoded properly and try again.") 187 188 except CsvDialectException: 189 self.dataset.log(f"Error with CSV dialect: {vars(dialect)}") 190 continue 191 192 # done! 193 infile.close() 194 # We successfully read the CSV, no need to try other dialects 195 break 196 197 if skipped or timestamp_missing: 198 error_message = "" 199 if timestamp_missing: 200 error_message += f"{timestamp_missing:,} items had no timestamp" 201 if skipped: 202 error_message += f"{' and ' if timestamp_missing else ''}{skipped:,} items were skipped because they could not be parsed or did not match the expected format" 203 204 self.dataset.update_status( 205 f"CSV file imported, but {error_message}. See dataset log for details.", 206 is_final=True) 207 208 temp_file.unlink() 209 self.dataset.delete_parameter("filename") 210 if skipped and not done: 211 self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing " 212 "the item's timestamp may be in a format that cannot be parsed properly.") 213 else: 214 self.dataset.finish(done) 215 216 def validate_query(query, request, config): 217 """ 218 Validate custom data input 219 220 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 221 some metadata. 222 223 :param dict query: Query parameters, from client-side. 224 :param request: Flask request 225 :param ConfigManager|None config: Configuration reader (context-aware) 226 :return dict: Safe query parameters 227 """ 228 # do we have an uploaded file? 229 if "option-data_upload" not in request.files: 230 raise QueryParametersException("No file was offered for upload.") 231 232 file = request.files["option-data_upload"] 233 if not file: 234 raise QueryParametersException("No file was offered for upload.") 235 236 if query.get("format") not in import_formats.tools: 237 raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}") 238 239 # content_length seems unreliable, so figure out the length by reading 240 # the file... 241 upload_size = 0 242 while True: 243 bit = file.read(1024) 244 if len(bit) == 0: 245 break 246 upload_size += len(bit) 247 248 file.seek(0) 249 encoding = sniff_encoding(file) 250 tool_format = import_formats.tools.get(query.get("format")) 251 252 253 try: 254 # try reading the file as csv here 255 # never read more than 128 kB (to keep it quick) 256 sample_size = min(upload_size, 128 * 1024) # 128 kB is sent from the frontend at most 257 wrapped_file = io.TextIOWrapper(file, encoding=encoding) 258 sample = wrapped_file.read(sample_size) 259 260 # sometimes more is actually worse, and the sniffer gets confused 261 # so as a back-up sample, use just the header row, which might give 262 # results if the full sample fails 263 samples = [sample, sample.split("\n")[0]] 264 265 if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"): 266 # this may be intended, or the check may be bad, so allow user to continue 267 raise QueryNeedsExplicitConfirmationException( 268 "The uploaded file does not seem to have a header row. Continue anyway?") 269 270 wrapped_file.seek(0) 271 errors = [] 272 dialect = None 273 while samples: 274 sample = samples.pop(0) 275 try: 276 dialect = csv.Sniffer().sniff(sample, delimiters=",;\t") 277 except csv.Error as e: 278 errors.append(str(e)) 279 # try next sample 280 continue 281 282 if not dialect: 283 raise csv.Error(", ".join(errors)) 284 285 # override the guesses for specific formats if defined so in 286 # import_formats.py 287 for prop in tool_format.get("csv_dialect", {}): 288 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 289 290 except UnicodeDecodeError: 291 raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. " 292 "Save the file in the proper format and try again.") 293 except csv.Error: 294 raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 295 296 # With validated csvs, save as is but make sure the raw file is sorted 297 reader = csv.DictReader(wrapped_file, dialect=dialect) 298 299 # we know that the CSV file is a CSV file now, next verify whether 300 # we know what each column means 301 try: 302 fields = reader.fieldnames 303 except UnicodeDecodeError: 304 raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 305 306 incomplete_mapping = list(tool_format["columns"]) 307 for field in tool_format["columns"]: 308 if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form: 309 incomplete_mapping.remove(field) 310 elif not tool_format.get("allow_user_mapping", False) and field in fields: 311 incomplete_mapping.remove(field) 312 313 # offer the user a number of select boxes where they can indicate the 314 # mapping for each column 315 column_mapping = {} 316 if tool_format.get("allow_user_mapping", False): 317 magic_mappings = { 318 "id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 319 "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 320 "empty": {"__4cat_empty_value": "[empty]"}, 321 "timestamp": {"__4cat_now": "[current date and time]"} 322 } 323 if incomplete_mapping: 324 raise QueryNeedsFurtherInputException({ 325 "mapping-info": { 326 "type": UserInput.OPTION_INFO, 327 "help": "Please confirm which column in the CSV file maps to each required value." 328 }, 329 **{ 330 "mapping-%s" % mappable_column: { 331 "type": UserInput.OPTION_CHOICE, 332 "options": { 333 "": "", 334 **magic_mappings.get(mappable_column, magic_mappings["empty"]), 335 **{column: column for column in fields} 336 }, 337 "default": mappable_column if mappable_column in fields else "", 338 "help": mappable_column, 339 "tooltip": tool_format["columns"][mappable_column] 340 } for mappable_column in incomplete_mapping 341 }}) 342 343 # the mappings do need to point to a column in the csv file 344 missing_mapping = [] 345 for field in tool_format["columns"]: 346 mapping_field = "option-mapping-%s" % field 347 provided_field = request.form.get(mapping_field) 348 if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field: 349 missing_mapping.append(field) 350 else: 351 column_mapping["mapping-" + field] = request.form.get(mapping_field) 352 353 if missing_mapping: 354 raise QueryParametersException( 355 "You need to indicate which column in the CSV file holds the corresponding value for the following " 356 "columns: %s" % ", ".join(missing_mapping)) 357 358 elif incomplete_mapping: 359 raise QueryParametersException("The CSV file does not contain all required columns. The following columns " 360 "are missing: %s" % ", ".join(incomplete_mapping)) 361 362 # the timestamp column needs to be parseable 363 timestamp_column = request.form.get("mapping-timestamp") 364 try: 365 row = reader.__next__() 366 if timestamp_column not in row: 367 # incomplete row because we are analysing a sample 368 # stop parsing because no complete rows will follow 369 raise StopIteration 370 371 if row[timestamp_column]: 372 try: 373 if row[timestamp_column].isdecimal(): 374 datetime.fromtimestamp(float(row[timestamp_column])) 375 else: 376 parse_datetime(row[timestamp_column]) 377 except (ValueError, OSError): 378 raise QueryParametersException( 379 "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)") 380 except AttributeError: 381 raise QueryParametersException("Couldn't correctly read the file, try formatting it differently") 382 else: 383 # the timestamp column is empty or contains empty values 384 if not query.get("frontend-confirm"): 385 # TODO: THIS never triggers! frontend-confirm is already set when columns are mapped 386 # TODO: frontend-confirm exceptions need to be made unique 387 raise QueryNeedsExplicitConfirmationException( 388 "Your 'timestamp' column contains empty values. Continue anyway?") 389 else: 390 # `None` value will be used 391 pass 392 393 except StopIteration: 394 pass 395 396 # ok, we're done with the file 397 wrapped_file.detach() 398 399 # Whether to strip the HTML tags 400 strip_html = False 401 if query.get("strip_html"): 402 strip_html = True 403 404 # return metadata - the filename is sanitised and serves no purpose at 405 # this point in time, but can be used to uniquely identify a dataset 406 disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]") 407 return { 408 "filename": disallowed_characters.sub("", file.filename), 409 "time": time.time(), 410 "datasource": "upload", 411 "board": query.get("format", "custom").replace("_", "-"), 412 "format": query.get("format"), 413 "strip_html": strip_html, 414 **column_mapping, 415 } 416 417 def after_create(query, dataset, request): 418 """ 419 Hook to execute after the dataset for this source has been created 420 421 In this case, put the file in a temporary location so it can be 422 processed properly by the related Job later. 423 424 :param dict query: Sanitised query parameters 425 :param DataSet dataset: Dataset created for this query 426 :param request: Flask request submitted for its creation 427 """ 428 file = request.files["option-data_upload"] 429 file.seek(0) 430 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 431 while True: 432 chunk = file.read(1024) 433 if len(chunk) == 0: 434 break 435 outfile.write(chunk)
23class SearchCustom(BasicProcessor): 24 type = "upload-search" # job ID 25 category = "Search" # category 26 title = "Custom Dataset Upload" # title displayed in UI 27 description = "Upload your own CSV file to be used as a dataset" # description displayed in UI 28 extension = "csv" # extension of result file, used internally and in UI 29 is_local = False # Whether this datasource is locally scraped 30 is_static = False # Whether this datasource is still updated 31 32 max_workers = 1 33 34 @classmethod 35 def get_options(cls, parent_dataset=None, config=None) -> dict: 36 """ 37 Get processor options 38 39 :param parent_dataset DataSet: An object representing the dataset that 40 the processor would be or was run on. Can be used, in conjunction with 41 config, to show some options only to privileged users. 42 :param config ConfigManager|None config: Configuration reader (context-aware) 43 :return dict: Options for this processor 44 """ 45 return { 46 "intro": { 47 "type": UserInput.OPTION_INFO, 48 "help": "You can upload a CSV or TAB file here that, after upload, will be available for further analysis " 49 "and processing. Files need to be [UTF-8](https://en.wikipedia.org/wiki/UTF-8)-encoded and must " 50 "contain a header row.\n\n" 51 "You can indicate what format the file has or upload one with arbitrary structure. In the latter " 52 "case, for each item, columns describing its ID, author, timestamp, and content are expected. You " 53 "can select which column holds which value after uploading the file." 54 }, 55 "data_upload": { 56 "type": UserInput.OPTION_FILE, 57 "help": "File" 58 }, 59 "format": { 60 "type": UserInput.OPTION_CHOICE, 61 "help": "CSV format", 62 "options": { 63 tool: info["name"] for tool, info in import_formats.tools.items() 64 }, 65 "default": "custom" 66 }, 67 "strip_html": { 68 "type": UserInput.OPTION_TOGGLE, 69 "help": "Strip HTML?", 70 "default": False, 71 "tooltip": "Removes HTML tags from the column identified as containing the item content ('body' by default)" 72 } 73 } 74 75 def process(self): 76 """ 77 Process uploaded CSV file 78 79 Applies the provided mapping and makes sure the file is in a format 80 4CAT will understand. 81 """ 82 tool_format = import_formats.tools.get(self.parameters.get("format")) 83 temp_file = self.dataset.get_results_path().with_suffix(".importing") 84 with temp_file.open("rb") as infile: 85 # detect encoding - UTF-8 with or without BOM 86 encoding = sniff_encoding(infile) 87 88 # figure out the csv dialect 89 # the sniffer is not perfect and sometimes makes mistakes 90 # for some formats we already know the dialect, so we can override its 91 # guess and set the properties as defined in import_formats.py 92 infile = temp_file.open("r", encoding=encoding) 93 sample = infile.read(1024 * 1024) 94 try: 95 possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))] 96 except csv.Error: 97 possible_dialects = csv.list_dialects() 98 if tool_format.get("csv_dialect", {}): 99 # Known dialects are defined in import_formats.py 100 dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t")) 101 for prop in tool_format.get("csv_dialect", {}): 102 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 103 possible_dialects.append(dialect) 104 105 while possible_dialects: 106 # With validated csvs, save as is but make sure the raw file is sorted 107 infile.seek(0) 108 dialect = possible_dialects.pop() # Use the last dialect first 109 self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) is csv.Dialect else dialect}") 110 reader = csv.DictReader(infile, dialect=dialect) 111 112 if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \ 113 set(tool_format["columns"]) != set(tool_format["columns"]): 114 raise QueryParametersException("Not all columns are present") 115 116 # hasher for pseudonymisation 117 salt = secrets.token_bytes(16) 118 hasher = hashlib.blake2b(digest_size=24, salt=salt) 119 hash_cache = HashCache(hasher) 120 121 # write the resulting dataset 122 writer = None 123 done = 0 124 skipped = 0 125 timestamp_missing = 0 126 with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv: 127 # mapper is defined in import_formats 128 try: 129 for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)): 130 if isinstance(item, import_formats.InvalidImportedItem): 131 # if the mapper returns this class, the item is not written 132 skipped += 1 133 if hasattr(item, "reason"): 134 self.dataset.log(f"Skipping item ({item.reason})") 135 continue 136 137 if not writer: 138 writer = csv.DictWriter(output_csv, fieldnames=list(item.keys())) 139 writer.writeheader() 140 141 if self.parameters.get("strip_html") and "body" in item: 142 item["body"] = strip_tags(item["body"]) 143 144 # check for None/empty timestamp 145 if not item.get("timestamp"): 146 # Notify the user that items are missing a timestamp 147 timestamp_missing += 1 148 self.dataset.log(f"Item {i} ({item.get('id')}) has no timestamp.") 149 150 # pseudonymise or anonymise as needed 151 filtering = self.parameters.get("pseudonymise") 152 try: 153 if filtering: 154 for field, value in item.items(): 155 if field is None: 156 # This would normally be caught when writerow is called 157 raise CsvDialectException("Field is None") 158 if field.startswith("author"): 159 if filtering == "anonymise": 160 item[field] = "REDACTED" 161 elif filtering == "pseudonymise": 162 item[field] = hash_cache.update_cache(value) 163 164 writer.writerow(item) 165 except ValueError as e: 166 if not possible_dialects: 167 self.dataset.log(f"Error ({e}) writing item {i}: {item}") 168 return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct " 169 "format or edited the CSV after exporting? Try importing " 170 "as custom format.") 171 else: 172 raise CsvDialectException(f"Error ({e}) writing item {i}: {item}") 173 174 done += 1 175 176 except import_formats.InvalidCustomFormat as e: 177 self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset " 178 "log for details.") 179 infile.close() 180 temp_file.unlink() 181 return self.dataset.finish_with_error(str(e)) 182 183 except UnicodeDecodeError: 184 infile.close() 185 temp_file.unlink() 186 return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. " 187 "Make sure the file is encoded properly and try again.") 188 189 except CsvDialectException: 190 self.dataset.log(f"Error with CSV dialect: {vars(dialect)}") 191 continue 192 193 # done! 194 infile.close() 195 # We successfully read the CSV, no need to try other dialects 196 break 197 198 if skipped or timestamp_missing: 199 error_message = "" 200 if timestamp_missing: 201 error_message += f"{timestamp_missing:,} items had no timestamp" 202 if skipped: 203 error_message += f"{' and ' if timestamp_missing else ''}{skipped:,} items were skipped because they could not be parsed or did not match the expected format" 204 205 self.dataset.update_status( 206 f"CSV file imported, but {error_message}. See dataset log for details.", 207 is_final=True) 208 209 temp_file.unlink() 210 self.dataset.delete_parameter("filename") 211 if skipped and not done: 212 self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing " 213 "the item's timestamp may be in a format that cannot be parsed properly.") 214 else: 215 self.dataset.finish(done) 216 217 def validate_query(query, request, config): 218 """ 219 Validate custom data input 220 221 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 222 some metadata. 223 224 :param dict query: Query parameters, from client-side. 225 :param request: Flask request 226 :param ConfigManager|None config: Configuration reader (context-aware) 227 :return dict: Safe query parameters 228 """ 229 # do we have an uploaded file? 230 if "option-data_upload" not in request.files: 231 raise QueryParametersException("No file was offered for upload.") 232 233 file = request.files["option-data_upload"] 234 if not file: 235 raise QueryParametersException("No file was offered for upload.") 236 237 if query.get("format") not in import_formats.tools: 238 raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}") 239 240 # content_length seems unreliable, so figure out the length by reading 241 # the file... 242 upload_size = 0 243 while True: 244 bit = file.read(1024) 245 if len(bit) == 0: 246 break 247 upload_size += len(bit) 248 249 file.seek(0) 250 encoding = sniff_encoding(file) 251 tool_format = import_formats.tools.get(query.get("format")) 252 253 254 try: 255 # try reading the file as csv here 256 # never read more than 128 kB (to keep it quick) 257 sample_size = min(upload_size, 128 * 1024) # 128 kB is sent from the frontend at most 258 wrapped_file = io.TextIOWrapper(file, encoding=encoding) 259 sample = wrapped_file.read(sample_size) 260 261 # sometimes more is actually worse, and the sniffer gets confused 262 # so as a back-up sample, use just the header row, which might give 263 # results if the full sample fails 264 samples = [sample, sample.split("\n")[0]] 265 266 if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"): 267 # this may be intended, or the check may be bad, so allow user to continue 268 raise QueryNeedsExplicitConfirmationException( 269 "The uploaded file does not seem to have a header row. Continue anyway?") 270 271 wrapped_file.seek(0) 272 errors = [] 273 dialect = None 274 while samples: 275 sample = samples.pop(0) 276 try: 277 dialect = csv.Sniffer().sniff(sample, delimiters=",;\t") 278 except csv.Error as e: 279 errors.append(str(e)) 280 # try next sample 281 continue 282 283 if not dialect: 284 raise csv.Error(", ".join(errors)) 285 286 # override the guesses for specific formats if defined so in 287 # import_formats.py 288 for prop in tool_format.get("csv_dialect", {}): 289 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 290 291 except UnicodeDecodeError: 292 raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. " 293 "Save the file in the proper format and try again.") 294 except csv.Error: 295 raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 296 297 # With validated csvs, save as is but make sure the raw file is sorted 298 reader = csv.DictReader(wrapped_file, dialect=dialect) 299 300 # we know that the CSV file is a CSV file now, next verify whether 301 # we know what each column means 302 try: 303 fields = reader.fieldnames 304 except UnicodeDecodeError: 305 raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 306 307 incomplete_mapping = list(tool_format["columns"]) 308 for field in tool_format["columns"]: 309 if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form: 310 incomplete_mapping.remove(field) 311 elif not tool_format.get("allow_user_mapping", False) and field in fields: 312 incomplete_mapping.remove(field) 313 314 # offer the user a number of select boxes where they can indicate the 315 # mapping for each column 316 column_mapping = {} 317 if tool_format.get("allow_user_mapping", False): 318 magic_mappings = { 319 "id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 320 "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 321 "empty": {"__4cat_empty_value": "[empty]"}, 322 "timestamp": {"__4cat_now": "[current date and time]"} 323 } 324 if incomplete_mapping: 325 raise QueryNeedsFurtherInputException({ 326 "mapping-info": { 327 "type": UserInput.OPTION_INFO, 328 "help": "Please confirm which column in the CSV file maps to each required value." 329 }, 330 **{ 331 "mapping-%s" % mappable_column: { 332 "type": UserInput.OPTION_CHOICE, 333 "options": { 334 "": "", 335 **magic_mappings.get(mappable_column, magic_mappings["empty"]), 336 **{column: column for column in fields} 337 }, 338 "default": mappable_column if mappable_column in fields else "", 339 "help": mappable_column, 340 "tooltip": tool_format["columns"][mappable_column] 341 } for mappable_column in incomplete_mapping 342 }}) 343 344 # the mappings do need to point to a column in the csv file 345 missing_mapping = [] 346 for field in tool_format["columns"]: 347 mapping_field = "option-mapping-%s" % field 348 provided_field = request.form.get(mapping_field) 349 if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field: 350 missing_mapping.append(field) 351 else: 352 column_mapping["mapping-" + field] = request.form.get(mapping_field) 353 354 if missing_mapping: 355 raise QueryParametersException( 356 "You need to indicate which column in the CSV file holds the corresponding value for the following " 357 "columns: %s" % ", ".join(missing_mapping)) 358 359 elif incomplete_mapping: 360 raise QueryParametersException("The CSV file does not contain all required columns. The following columns " 361 "are missing: %s" % ", ".join(incomplete_mapping)) 362 363 # the timestamp column needs to be parseable 364 timestamp_column = request.form.get("mapping-timestamp") 365 try: 366 row = reader.__next__() 367 if timestamp_column not in row: 368 # incomplete row because we are analysing a sample 369 # stop parsing because no complete rows will follow 370 raise StopIteration 371 372 if row[timestamp_column]: 373 try: 374 if row[timestamp_column].isdecimal(): 375 datetime.fromtimestamp(float(row[timestamp_column])) 376 else: 377 parse_datetime(row[timestamp_column]) 378 except (ValueError, OSError): 379 raise QueryParametersException( 380 "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)") 381 except AttributeError: 382 raise QueryParametersException("Couldn't correctly read the file, try formatting it differently") 383 else: 384 # the timestamp column is empty or contains empty values 385 if not query.get("frontend-confirm"): 386 # TODO: THIS never triggers! frontend-confirm is already set when columns are mapped 387 # TODO: frontend-confirm exceptions need to be made unique 388 raise QueryNeedsExplicitConfirmationException( 389 "Your 'timestamp' column contains empty values. Continue anyway?") 390 else: 391 # `None` value will be used 392 pass 393 394 except StopIteration: 395 pass 396 397 # ok, we're done with the file 398 wrapped_file.detach() 399 400 # Whether to strip the HTML tags 401 strip_html = False 402 if query.get("strip_html"): 403 strip_html = True 404 405 # return metadata - the filename is sanitised and serves no purpose at 406 # this point in time, but can be used to uniquely identify a dataset 407 disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]") 408 return { 409 "filename": disallowed_characters.sub("", file.filename), 410 "time": time.time(), 411 "datasource": "upload", 412 "board": query.get("format", "custom").replace("_", "-"), 413 "format": query.get("format"), 414 "strip_html": strip_html, 415 **column_mapping, 416 } 417 418 def after_create(query, dataset, request): 419 """ 420 Hook to execute after the dataset for this source has been created 421 422 In this case, put the file in a temporary location so it can be 423 processed properly by the related Job later. 424 425 :param dict query: Sanitised query parameters 426 :param DataSet dataset: Dataset created for this query 427 :param request: Flask request submitted for its creation 428 """ 429 file = request.files["option-data_upload"] 430 file.seek(0) 431 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 432 while True: 433 chunk = file.read(1024) 434 if len(chunk) == 0: 435 break 436 outfile.write(chunk)
Abstract processor class
A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).
To determine whether a processor can process a given dataset, you can
define a is_compatible_with(FourcatModule module=None, config=None):) -> bool class
method which takes a dataset as argument and returns a bool that determines
if this processor is considered compatible with that dataset. For example:
@classmethod
def is_compatible_with(cls, module=None, config=None):
return module.type == "linguistic-features"
34 @classmethod 35 def get_options(cls, parent_dataset=None, config=None) -> dict: 36 """ 37 Get processor options 38 39 :param parent_dataset DataSet: An object representing the dataset that 40 the processor would be or was run on. Can be used, in conjunction with 41 config, to show some options only to privileged users. 42 :param config ConfigManager|None config: Configuration reader (context-aware) 43 :return dict: Options for this processor 44 """ 45 return { 46 "intro": { 47 "type": UserInput.OPTION_INFO, 48 "help": "You can upload a CSV or TAB file here that, after upload, will be available for further analysis " 49 "and processing. Files need to be [UTF-8](https://en.wikipedia.org/wiki/UTF-8)-encoded and must " 50 "contain a header row.\n\n" 51 "You can indicate what format the file has or upload one with arbitrary structure. In the latter " 52 "case, for each item, columns describing its ID, author, timestamp, and content are expected. You " 53 "can select which column holds which value after uploading the file." 54 }, 55 "data_upload": { 56 "type": UserInput.OPTION_FILE, 57 "help": "File" 58 }, 59 "format": { 60 "type": UserInput.OPTION_CHOICE, 61 "help": "CSV format", 62 "options": { 63 tool: info["name"] for tool, info in import_formats.tools.items() 64 }, 65 "default": "custom" 66 }, 67 "strip_html": { 68 "type": UserInput.OPTION_TOGGLE, 69 "help": "Strip HTML?", 70 "default": False, 71 "tooltip": "Removes HTML tags from the column identified as containing the item content ('body' by default)" 72 } 73 }
Get processor options
Parameters
- parent_dataset DataSet: An object representing the dataset that the processor would be or was run on. Can be used, in conjunction with config, to show some options only to privileged users.
- config ConfigManager|None config: Configuration reader (context-aware)
Returns
Options for this processor
75 def process(self): 76 """ 77 Process uploaded CSV file 78 79 Applies the provided mapping and makes sure the file is in a format 80 4CAT will understand. 81 """ 82 tool_format = import_formats.tools.get(self.parameters.get("format")) 83 temp_file = self.dataset.get_results_path().with_suffix(".importing") 84 with temp_file.open("rb") as infile: 85 # detect encoding - UTF-8 with or without BOM 86 encoding = sniff_encoding(infile) 87 88 # figure out the csv dialect 89 # the sniffer is not perfect and sometimes makes mistakes 90 # for some formats we already know the dialect, so we can override its 91 # guess and set the properties as defined in import_formats.py 92 infile = temp_file.open("r", encoding=encoding) 93 sample = infile.read(1024 * 1024) 94 try: 95 possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))] 96 except csv.Error: 97 possible_dialects = csv.list_dialects() 98 if tool_format.get("csv_dialect", {}): 99 # Known dialects are defined in import_formats.py 100 dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t")) 101 for prop in tool_format.get("csv_dialect", {}): 102 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 103 possible_dialects.append(dialect) 104 105 while possible_dialects: 106 # With validated csvs, save as is but make sure the raw file is sorted 107 infile.seek(0) 108 dialect = possible_dialects.pop() # Use the last dialect first 109 self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) is csv.Dialect else dialect}") 110 reader = csv.DictReader(infile, dialect=dialect) 111 112 if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \ 113 set(tool_format["columns"]) != set(tool_format["columns"]): 114 raise QueryParametersException("Not all columns are present") 115 116 # hasher for pseudonymisation 117 salt = secrets.token_bytes(16) 118 hasher = hashlib.blake2b(digest_size=24, salt=salt) 119 hash_cache = HashCache(hasher) 120 121 # write the resulting dataset 122 writer = None 123 done = 0 124 skipped = 0 125 timestamp_missing = 0 126 with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv: 127 # mapper is defined in import_formats 128 try: 129 for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)): 130 if isinstance(item, import_formats.InvalidImportedItem): 131 # if the mapper returns this class, the item is not written 132 skipped += 1 133 if hasattr(item, "reason"): 134 self.dataset.log(f"Skipping item ({item.reason})") 135 continue 136 137 if not writer: 138 writer = csv.DictWriter(output_csv, fieldnames=list(item.keys())) 139 writer.writeheader() 140 141 if self.parameters.get("strip_html") and "body" in item: 142 item["body"] = strip_tags(item["body"]) 143 144 # check for None/empty timestamp 145 if not item.get("timestamp"): 146 # Notify the user that items are missing a timestamp 147 timestamp_missing += 1 148 self.dataset.log(f"Item {i} ({item.get('id')}) has no timestamp.") 149 150 # pseudonymise or anonymise as needed 151 filtering = self.parameters.get("pseudonymise") 152 try: 153 if filtering: 154 for field, value in item.items(): 155 if field is None: 156 # This would normally be caught when writerow is called 157 raise CsvDialectException("Field is None") 158 if field.startswith("author"): 159 if filtering == "anonymise": 160 item[field] = "REDACTED" 161 elif filtering == "pseudonymise": 162 item[field] = hash_cache.update_cache(value) 163 164 writer.writerow(item) 165 except ValueError as e: 166 if not possible_dialects: 167 self.dataset.log(f"Error ({e}) writing item {i}: {item}") 168 return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct " 169 "format or edited the CSV after exporting? Try importing " 170 "as custom format.") 171 else: 172 raise CsvDialectException(f"Error ({e}) writing item {i}: {item}") 173 174 done += 1 175 176 except import_formats.InvalidCustomFormat as e: 177 self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset " 178 "log for details.") 179 infile.close() 180 temp_file.unlink() 181 return self.dataset.finish_with_error(str(e)) 182 183 except UnicodeDecodeError: 184 infile.close() 185 temp_file.unlink() 186 return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. " 187 "Make sure the file is encoded properly and try again.") 188 189 except CsvDialectException: 190 self.dataset.log(f"Error with CSV dialect: {vars(dialect)}") 191 continue 192 193 # done! 194 infile.close() 195 # We successfully read the CSV, no need to try other dialects 196 break 197 198 if skipped or timestamp_missing: 199 error_message = "" 200 if timestamp_missing: 201 error_message += f"{timestamp_missing:,} items had no timestamp" 202 if skipped: 203 error_message += f"{' and ' if timestamp_missing else ''}{skipped:,} items were skipped because they could not be parsed or did not match the expected format" 204 205 self.dataset.update_status( 206 f"CSV file imported, but {error_message}. See dataset log for details.", 207 is_final=True) 208 209 temp_file.unlink() 210 self.dataset.delete_parameter("filename") 211 if skipped and not done: 212 self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing " 213 "the item's timestamp may be in a format that cannot be parsed properly.") 214 else: 215 self.dataset.finish(done)
Process uploaded CSV file
Applies the provided mapping and makes sure the file is in a format 4CAT will understand.
217 def validate_query(query, request, config): 218 """ 219 Validate custom data input 220 221 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 222 some metadata. 223 224 :param dict query: Query parameters, from client-side. 225 :param request: Flask request 226 :param ConfigManager|None config: Configuration reader (context-aware) 227 :return dict: Safe query parameters 228 """ 229 # do we have an uploaded file? 230 if "option-data_upload" not in request.files: 231 raise QueryParametersException("No file was offered for upload.") 232 233 file = request.files["option-data_upload"] 234 if not file: 235 raise QueryParametersException("No file was offered for upload.") 236 237 if query.get("format") not in import_formats.tools: 238 raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}") 239 240 # content_length seems unreliable, so figure out the length by reading 241 # the file... 242 upload_size = 0 243 while True: 244 bit = file.read(1024) 245 if len(bit) == 0: 246 break 247 upload_size += len(bit) 248 249 file.seek(0) 250 encoding = sniff_encoding(file) 251 tool_format = import_formats.tools.get(query.get("format")) 252 253 254 try: 255 # try reading the file as csv here 256 # never read more than 128 kB (to keep it quick) 257 sample_size = min(upload_size, 128 * 1024) # 128 kB is sent from the frontend at most 258 wrapped_file = io.TextIOWrapper(file, encoding=encoding) 259 sample = wrapped_file.read(sample_size) 260 261 # sometimes more is actually worse, and the sniffer gets confused 262 # so as a back-up sample, use just the header row, which might give 263 # results if the full sample fails 264 samples = [sample, sample.split("\n")[0]] 265 266 if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"): 267 # this may be intended, or the check may be bad, so allow user to continue 268 raise QueryNeedsExplicitConfirmationException( 269 "The uploaded file does not seem to have a header row. Continue anyway?") 270 271 wrapped_file.seek(0) 272 errors = [] 273 dialect = None 274 while samples: 275 sample = samples.pop(0) 276 try: 277 dialect = csv.Sniffer().sniff(sample, delimiters=",;\t") 278 except csv.Error as e: 279 errors.append(str(e)) 280 # try next sample 281 continue 282 283 if not dialect: 284 raise csv.Error(", ".join(errors)) 285 286 # override the guesses for specific formats if defined so in 287 # import_formats.py 288 for prop in tool_format.get("csv_dialect", {}): 289 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 290 291 except UnicodeDecodeError: 292 raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. " 293 "Save the file in the proper format and try again.") 294 except csv.Error: 295 raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 296 297 # With validated csvs, save as is but make sure the raw file is sorted 298 reader = csv.DictReader(wrapped_file, dialect=dialect) 299 300 # we know that the CSV file is a CSV file now, next verify whether 301 # we know what each column means 302 try: 303 fields = reader.fieldnames 304 except UnicodeDecodeError: 305 raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 306 307 incomplete_mapping = list(tool_format["columns"]) 308 for field in tool_format["columns"]: 309 if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form: 310 incomplete_mapping.remove(field) 311 elif not tool_format.get("allow_user_mapping", False) and field in fields: 312 incomplete_mapping.remove(field) 313 314 # offer the user a number of select boxes where they can indicate the 315 # mapping for each column 316 column_mapping = {} 317 if tool_format.get("allow_user_mapping", False): 318 magic_mappings = { 319 "id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 320 "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 321 "empty": {"__4cat_empty_value": "[empty]"}, 322 "timestamp": {"__4cat_now": "[current date and time]"} 323 } 324 if incomplete_mapping: 325 raise QueryNeedsFurtherInputException({ 326 "mapping-info": { 327 "type": UserInput.OPTION_INFO, 328 "help": "Please confirm which column in the CSV file maps to each required value." 329 }, 330 **{ 331 "mapping-%s" % mappable_column: { 332 "type": UserInput.OPTION_CHOICE, 333 "options": { 334 "": "", 335 **magic_mappings.get(mappable_column, magic_mappings["empty"]), 336 **{column: column for column in fields} 337 }, 338 "default": mappable_column if mappable_column in fields else "", 339 "help": mappable_column, 340 "tooltip": tool_format["columns"][mappable_column] 341 } for mappable_column in incomplete_mapping 342 }}) 343 344 # the mappings do need to point to a column in the csv file 345 missing_mapping = [] 346 for field in tool_format["columns"]: 347 mapping_field = "option-mapping-%s" % field 348 provided_field = request.form.get(mapping_field) 349 if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field: 350 missing_mapping.append(field) 351 else: 352 column_mapping["mapping-" + field] = request.form.get(mapping_field) 353 354 if missing_mapping: 355 raise QueryParametersException( 356 "You need to indicate which column in the CSV file holds the corresponding value for the following " 357 "columns: %s" % ", ".join(missing_mapping)) 358 359 elif incomplete_mapping: 360 raise QueryParametersException("The CSV file does not contain all required columns. The following columns " 361 "are missing: %s" % ", ".join(incomplete_mapping)) 362 363 # the timestamp column needs to be parseable 364 timestamp_column = request.form.get("mapping-timestamp") 365 try: 366 row = reader.__next__() 367 if timestamp_column not in row: 368 # incomplete row because we are analysing a sample 369 # stop parsing because no complete rows will follow 370 raise StopIteration 371 372 if row[timestamp_column]: 373 try: 374 if row[timestamp_column].isdecimal(): 375 datetime.fromtimestamp(float(row[timestamp_column])) 376 else: 377 parse_datetime(row[timestamp_column]) 378 except (ValueError, OSError): 379 raise QueryParametersException( 380 "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)") 381 except AttributeError: 382 raise QueryParametersException("Couldn't correctly read the file, try formatting it differently") 383 else: 384 # the timestamp column is empty or contains empty values 385 if not query.get("frontend-confirm"): 386 # TODO: THIS never triggers! frontend-confirm is already set when columns are mapped 387 # TODO: frontend-confirm exceptions need to be made unique 388 raise QueryNeedsExplicitConfirmationException( 389 "Your 'timestamp' column contains empty values. Continue anyway?") 390 else: 391 # `None` value will be used 392 pass 393 394 except StopIteration: 395 pass 396 397 # ok, we're done with the file 398 wrapped_file.detach() 399 400 # Whether to strip the HTML tags 401 strip_html = False 402 if query.get("strip_html"): 403 strip_html = True 404 405 # return metadata - the filename is sanitised and serves no purpose at 406 # this point in time, but can be used to uniquely identify a dataset 407 disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]") 408 return { 409 "filename": disallowed_characters.sub("", file.filename), 410 "time": time.time(), 411 "datasource": "upload", 412 "board": query.get("format", "custom").replace("_", "-"), 413 "format": query.get("format"), 414 "strip_html": strip_html, 415 **column_mapping, 416 }
Validate custom data input
Confirms that the uploaded file is a valid CSV or tab file and, if so, returns some metadata.
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- ConfigManager|None config: Configuration reader (context-aware)
Returns
Safe query parameters
418 def after_create(query, dataset, request): 419 """ 420 Hook to execute after the dataset for this source has been created 421 422 In this case, put the file in a temporary location so it can be 423 processed properly by the related Job later. 424 425 :param dict query: Sanitised query parameters 426 :param DataSet dataset: Dataset created for this query 427 :param request: Flask request submitted for its creation 428 """ 429 file = request.files["option-data_upload"] 430 file.seek(0) 431 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 432 while True: 433 chunk = file.read(1024) 434 if len(chunk) == 0: 435 break 436 outfile.write(chunk)
Hook to execute after the dataset for this source has been created
In this case, put the file in a temporary location so it can be processed properly by the related Job later.
Parameters
- dict query: Sanitised query parameters
- DataSet dataset: Dataset created for this query
- request: Flask request submitted for its creation
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- run_interruptable_process
- get_queue_id
- is_4cat_class
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- config
- is_running_in_preset
- filepath
- for_cleanup
- work
- after_process
- clean_up_on_error
- abort
- iterate_proxied_requests
- push_proxied_request
- flush_proxied_requests
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- save_annotations
- map_item_method_available
- get_mapped_item
- is_filter
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor