datasources.upload.import_csv
Custom data upload to create bespoke datasets
1""" 2Custom data upload to create bespoke datasets 3""" 4import secrets 5import hashlib 6import time 7import csv 8import re 9import io 10 11import datasources.upload.import_formats as import_formats 12 13from dateutil.parser import parse as parse_datetime 14from datetime import datetime 15 16from backend.lib.processor import BasicProcessor 17from common.lib.exceptions import QueryParametersException, QueryNeedsFurtherInputException, \ 18 QueryNeedsExplicitConfirmationException, CsvDialectException 19from common.lib.helpers import strip_tags, sniff_encoding, UserInput, HashCache 20 21 22class SearchCustom(BasicProcessor): 23 type = "upload-search" # job ID 24 category = "Search" # category 25 title = "Custom Dataset Upload" # title displayed in UI 26 description = "Upload your own CSV file to be used as a dataset" # description displayed in UI 27 extension = "csv" # extension of result file, used internally and in UI 28 is_local = False # Whether this datasource is locally scraped 29 is_static = False # Whether this datasource is still updated 30 31 max_workers = 1 32 options = { 33 "intro": { 34 "type": UserInput.OPTION_INFO, 35 "help": "You can upload a CSV or TAB file here that, after upload, will be available for further analysis " 36 "and processing. Files need to be [UTF-8](https://en.wikipedia.org/wiki/UTF-8)-encoded and must " 37 "contain a header row.\n\n" 38 "You can indicate what format the file has or upload one with arbitrary structure. In the latter " 39 "case, for each item, columns describing its ID, author, timestamp, and content are expected. You " 40 "can select which column holds which value after uploading the file." 41 }, 42 "data_upload": { 43 "type": UserInput.OPTION_FILE, 44 "help": "File" 45 }, 46 "format": { 47 "type": UserInput.OPTION_CHOICE, 48 "help": "CSV format", 49 "options": { 50 tool: info["name"] for tool, info in import_formats.tools.items() 51 }, 52 "default": "custom" 53 }, 54 "strip_html": { 55 "type": UserInput.OPTION_TOGGLE, 56 "help": "Strip HTML?", 57 "default": False, 58 "tooltip": "Removes HTML tags from the column identified as containing the item content ('body' by default)" 59 } 60 } 61 62 def process(self): 63 """ 64 Process uploaded CSV file 65 66 Applies the provided mapping and makes sure the file is in a format 67 4CAT will understand. 68 """ 69 tool_format = import_formats.tools.get(self.parameters.get("format")) 70 temp_file = self.dataset.get_results_path().with_suffix(".importing") 71 with temp_file.open("rb") as infile: 72 # detect encoding - UTF-8 with or without BOM 73 encoding = sniff_encoding(infile) 74 75 # figure out the csv dialect 76 # the sniffer is not perfect and sometimes makes mistakes 77 # for some formats we already know the dialect, so we can override its 78 # guess and set the properties as defined in import_formats.py 79 infile = temp_file.open("r", encoding=encoding) 80 sample = infile.read(1024 * 1024) 81 try: 82 possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))] 83 except csv.Error: 84 possible_dialects = csv.list_dialects() 85 if tool_format.get("csv_dialect", {}): 86 # Known dialects are defined in import_formats.py 87 dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t")) 88 for prop in tool_format.get("csv_dialect", {}): 89 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 90 possible_dialects.append(dialect) 91 92 while possible_dialects: 93 # With validated csvs, save as is but make sure the raw file is sorted 94 infile.seek(0) 95 dialect = possible_dialects.pop() # Use the last dialect first 96 self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) is csv.Dialect else dialect}") 97 reader = csv.DictReader(infile, dialect=dialect) 98 99 if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \ 100 set(tool_format["columns"]) != set(tool_format["columns"]): 101 raise QueryParametersException("Not all columns are present") 102 103 # hasher for pseudonymisation 104 salt = secrets.token_bytes(16) 105 hasher = hashlib.blake2b(digest_size=24, salt=salt) 106 hash_cache = HashCache(hasher) 107 108 # write the resulting dataset 109 writer = None 110 done = 0 111 skipped = 0 112 timestamp_missing = 0 113 with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv: 114 # mapper is defined in import_formats 115 try: 116 for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)): 117 if isinstance(item, import_formats.InvalidImportedItem): 118 # if the mapper returns this class, the item is not written 119 skipped += 1 120 if hasattr(item, "reason"): 121 self.dataset.log(f"Skipping item ({item.reason})") 122 continue 123 124 if not writer: 125 writer = csv.DictWriter(output_csv, fieldnames=list(item.keys())) 126 writer.writeheader() 127 128 if self.parameters.get("strip_html") and "body" in item: 129 item["body"] = strip_tags(item["body"]) 130 131 # check for None/empty timestamp 132 if not item.get("timestamp"): 133 # Notify the user that items are missing a timestamp 134 timestamp_missing += 1 135 self.dataset.log(f"Item {i} ({item.get('id')}) has no timestamp.") 136 137 # pseudonymise or anonymise as needed 138 filtering = self.parameters.get("pseudonymise") 139 try: 140 if filtering: 141 for field, value in item.items(): 142 if field is None: 143 # This would normally be caught when writerow is called 144 raise CsvDialectException("Field is None") 145 if field.startswith("author"): 146 if filtering == "anonymise": 147 item[field] = "REDACTED" 148 elif filtering == "pseudonymise": 149 item[field] = hash_cache.update_cache(value) 150 151 writer.writerow(item) 152 except ValueError as e: 153 if not possible_dialects: 154 self.dataset.log(f"Error ({e}) writing item {i}: {item}") 155 return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct " 156 "format or edited the CSV after exporting? Try importing " 157 "as custom format.") 158 else: 159 raise CsvDialectException(f"Error ({e}) writing item {i}: {item}") 160 161 done += 1 162 163 except import_formats.InvalidCustomFormat as e: 164 self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset " 165 "log for details.") 166 infile.close() 167 temp_file.unlink() 168 return self.dataset.finish_with_error(str(e)) 169 170 except UnicodeDecodeError: 171 infile.close() 172 temp_file.unlink() 173 return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. " 174 "Make sure the file is encoded properly and try again.") 175 176 except CsvDialectException: 177 self.dataset.log(f"Error with CSV dialect: {vars(dialect)}") 178 continue 179 180 # done! 181 infile.close() 182 # We successfully read the CSV, no need to try other dialects 183 break 184 185 if skipped or timestamp_missing: 186 error_message = "" 187 if timestamp_missing: 188 error_message += f"{timestamp_missing:,} items had no timestamp" 189 if skipped: 190 error_message += f"{' and ' if timestamp_missing else ''}{skipped:,} items were skipped because they could not be parsed or did not match the expected format" 191 192 self.dataset.update_status( 193 f"CSV file imported, but {error_message}. See dataset log for details.", 194 is_final=True) 195 196 temp_file.unlink() 197 self.dataset.delete_parameter("filename") 198 if skipped and not done: 199 self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing " 200 "the item's timestamp may be in a format that cannot be parsed properly.") 201 else: 202 self.dataset.finish(done) 203 204 def validate_query(query, request, config): 205 """ 206 Validate custom data input 207 208 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 209 some metadata. 210 211 :param dict query: Query parameters, from client-side. 212 :param request: Flask request 213 :param ConfigManager|None config: Configuration reader (context-aware) 214 :return dict: Safe query parameters 215 """ 216 # do we have an uploaded file? 217 if "option-data_upload" not in request.files: 218 raise QueryParametersException("No file was offered for upload.") 219 220 file = request.files["option-data_upload"] 221 if not file: 222 raise QueryParametersException("No file was offered for upload.") 223 224 if query.get("format") not in import_formats.tools: 225 raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}") 226 227 # content_length seems unreliable, so figure out the length by reading 228 # the file... 229 upload_size = 0 230 while True: 231 bit = file.read(1024) 232 if len(bit) == 0: 233 break 234 upload_size += len(bit) 235 236 file.seek(0) 237 encoding = sniff_encoding(file) 238 tool_format = import_formats.tools.get(query.get("format")) 239 240 try: 241 # try reading the file as csv here 242 # never read more than 128 kB (to keep it quick) 243 sample_size = min(upload_size, 128 * 1024) # 128 kB is sent from the frontend at most 244 wrapped_file = io.TextIOWrapper(file, encoding=encoding) 245 sample = wrapped_file.read(sample_size) 246 247 if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"): 248 # this may be intended, or the check may be bad, so allow user to continue 249 raise QueryNeedsExplicitConfirmationException( 250 "The uploaded file does not seem to have a header row. Continue anyway?") 251 252 wrapped_file.seek(0) 253 dialect = csv.Sniffer().sniff(sample, delimiters=",;\t") 254 255 # override the guesses for specific formats if defined so in 256 # import_formats.py 257 for prop in tool_format.get("csv_dialect", {}): 258 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 259 260 except UnicodeDecodeError: 261 raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. " 262 "Save the file in the proper format and try again.") 263 except csv.Error: 264 raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 265 266 # With validated csvs, save as is but make sure the raw file is sorted 267 reader = csv.DictReader(wrapped_file, dialect=dialect) 268 269 # we know that the CSV file is a CSV file now, next verify whether 270 # we know what each column means 271 try: 272 fields = reader.fieldnames 273 except UnicodeDecodeError: 274 raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 275 276 incomplete_mapping = list(tool_format["columns"]) 277 for field in tool_format["columns"]: 278 if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form: 279 incomplete_mapping.remove(field) 280 elif not tool_format.get("allow_user_mapping", False) and field in fields: 281 incomplete_mapping.remove(field) 282 283 # offer the user a number of select boxes where they can indicate the 284 # mapping for each column 285 column_mapping = {} 286 if tool_format.get("allow_user_mapping", False): 287 magic_mappings = { 288 "id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 289 "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 290 "empty": {"__4cat_empty_value": "[empty]"}, 291 "timestamp": {"__4cat_now": "[current date and time]"} 292 } 293 if incomplete_mapping: 294 raise QueryNeedsFurtherInputException({ 295 "mapping-info": { 296 "type": UserInput.OPTION_INFO, 297 "help": "Please confirm which column in the CSV file maps to each required value." 298 }, 299 **{ 300 "mapping-%s" % mappable_column: { 301 "type": UserInput.OPTION_CHOICE, 302 "options": { 303 "": "", 304 **magic_mappings.get(mappable_column, magic_mappings["empty"]), 305 **{column: column for column in fields} 306 }, 307 "default": mappable_column if mappable_column in fields else "", 308 "help": mappable_column, 309 "tooltip": tool_format["columns"][mappable_column] 310 } for mappable_column in incomplete_mapping 311 }}) 312 313 # the mappings do need to point to a column in the csv file 314 missing_mapping = [] 315 for field in tool_format["columns"]: 316 mapping_field = "option-mapping-%s" % field 317 provided_field = request.form.get(mapping_field) 318 if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field: 319 missing_mapping.append(field) 320 else: 321 column_mapping["mapping-" + field] = request.form.get(mapping_field) 322 323 if missing_mapping: 324 raise QueryParametersException( 325 "You need to indicate which column in the CSV file holds the corresponding value for the following " 326 "columns: %s" % ", ".join(missing_mapping)) 327 328 elif incomplete_mapping: 329 raise QueryParametersException("The CSV file does not contain all required columns. The following columns " 330 "are missing: %s" % ", ".join(incomplete_mapping)) 331 332 # the timestamp column needs to be parseable 333 timestamp_column = request.form.get("mapping-timestamp") 334 try: 335 row = reader.__next__() 336 if timestamp_column not in row: 337 # incomplete row because we are analysing a sample 338 # stop parsing because no complete rows will follow 339 raise StopIteration 340 341 if row[timestamp_column]: 342 try: 343 if row[timestamp_column].isdecimal(): 344 datetime.fromtimestamp(float(row[timestamp_column])) 345 else: 346 parse_datetime(row[timestamp_column]) 347 except (ValueError, OSError): 348 raise QueryParametersException( 349 "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)") 350 else: 351 # the timestamp column is empty or contains empty values 352 if not query.get("frontend-confirm"): 353 # TODO: THIS never triggers! frontend-confirm is already set when columns are mapped 354 # TODO: frontend-confirm exceptions need to be made unique 355 raise QueryNeedsExplicitConfirmationException( 356 "Your 'timestamp' column contains empty values. Continue anyway?") 357 else: 358 # `None` value will be used 359 pass 360 361 except StopIteration: 362 pass 363 364 # ok, we're done with the file 365 wrapped_file.detach() 366 367 # Whether to strip the HTML tags 368 strip_html = False 369 if query.get("strip_html"): 370 strip_html = True 371 372 # return metadata - the filename is sanitised and serves no purpose at 373 # this point in time, but can be used to uniquely identify a dataset 374 disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]") 375 return { 376 "filename": disallowed_characters.sub("", file.filename), 377 "time": time.time(), 378 "datasource": "upload", 379 "board": query.get("format", "custom").replace("_", "-"), 380 "format": query.get("format"), 381 "strip_html": strip_html, 382 **column_mapping, 383 } 384 385 def after_create(query, dataset, request): 386 """ 387 Hook to execute after the dataset for this source has been created 388 389 In this case, put the file in a temporary location so it can be 390 processed properly by the related Job later. 391 392 :param dict query: Sanitised query parameters 393 :param DataSet dataset: Dataset created for this query 394 :param request: Flask request submitted for its creation 395 """ 396 file = request.files["option-data_upload"] 397 file.seek(0) 398 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 399 while True: 400 chunk = file.read(1024) 401 if len(chunk) == 0: 402 break 403 outfile.write(chunk)
23class SearchCustom(BasicProcessor): 24 type = "upload-search" # job ID 25 category = "Search" # category 26 title = "Custom Dataset Upload" # title displayed in UI 27 description = "Upload your own CSV file to be used as a dataset" # description displayed in UI 28 extension = "csv" # extension of result file, used internally and in UI 29 is_local = False # Whether this datasource is locally scraped 30 is_static = False # Whether this datasource is still updated 31 32 max_workers = 1 33 options = { 34 "intro": { 35 "type": UserInput.OPTION_INFO, 36 "help": "You can upload a CSV or TAB file here that, after upload, will be available for further analysis " 37 "and processing. Files need to be [UTF-8](https://en.wikipedia.org/wiki/UTF-8)-encoded and must " 38 "contain a header row.\n\n" 39 "You can indicate what format the file has or upload one with arbitrary structure. In the latter " 40 "case, for each item, columns describing its ID, author, timestamp, and content are expected. You " 41 "can select which column holds which value after uploading the file." 42 }, 43 "data_upload": { 44 "type": UserInput.OPTION_FILE, 45 "help": "File" 46 }, 47 "format": { 48 "type": UserInput.OPTION_CHOICE, 49 "help": "CSV format", 50 "options": { 51 tool: info["name"] for tool, info in import_formats.tools.items() 52 }, 53 "default": "custom" 54 }, 55 "strip_html": { 56 "type": UserInput.OPTION_TOGGLE, 57 "help": "Strip HTML?", 58 "default": False, 59 "tooltip": "Removes HTML tags from the column identified as containing the item content ('body' by default)" 60 } 61 } 62 63 def process(self): 64 """ 65 Process uploaded CSV file 66 67 Applies the provided mapping and makes sure the file is in a format 68 4CAT will understand. 69 """ 70 tool_format = import_formats.tools.get(self.parameters.get("format")) 71 temp_file = self.dataset.get_results_path().with_suffix(".importing") 72 with temp_file.open("rb") as infile: 73 # detect encoding - UTF-8 with or without BOM 74 encoding = sniff_encoding(infile) 75 76 # figure out the csv dialect 77 # the sniffer is not perfect and sometimes makes mistakes 78 # for some formats we already know the dialect, so we can override its 79 # guess and set the properties as defined in import_formats.py 80 infile = temp_file.open("r", encoding=encoding) 81 sample = infile.read(1024 * 1024) 82 try: 83 possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))] 84 except csv.Error: 85 possible_dialects = csv.list_dialects() 86 if tool_format.get("csv_dialect", {}): 87 # Known dialects are defined in import_formats.py 88 dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t")) 89 for prop in tool_format.get("csv_dialect", {}): 90 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 91 possible_dialects.append(dialect) 92 93 while possible_dialects: 94 # With validated csvs, save as is but make sure the raw file is sorted 95 infile.seek(0) 96 dialect = possible_dialects.pop() # Use the last dialect first 97 self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) is csv.Dialect else dialect}") 98 reader = csv.DictReader(infile, dialect=dialect) 99 100 if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \ 101 set(tool_format["columns"]) != set(tool_format["columns"]): 102 raise QueryParametersException("Not all columns are present") 103 104 # hasher for pseudonymisation 105 salt = secrets.token_bytes(16) 106 hasher = hashlib.blake2b(digest_size=24, salt=salt) 107 hash_cache = HashCache(hasher) 108 109 # write the resulting dataset 110 writer = None 111 done = 0 112 skipped = 0 113 timestamp_missing = 0 114 with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv: 115 # mapper is defined in import_formats 116 try: 117 for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)): 118 if isinstance(item, import_formats.InvalidImportedItem): 119 # if the mapper returns this class, the item is not written 120 skipped += 1 121 if hasattr(item, "reason"): 122 self.dataset.log(f"Skipping item ({item.reason})") 123 continue 124 125 if not writer: 126 writer = csv.DictWriter(output_csv, fieldnames=list(item.keys())) 127 writer.writeheader() 128 129 if self.parameters.get("strip_html") and "body" in item: 130 item["body"] = strip_tags(item["body"]) 131 132 # check for None/empty timestamp 133 if not item.get("timestamp"): 134 # Notify the user that items are missing a timestamp 135 timestamp_missing += 1 136 self.dataset.log(f"Item {i} ({item.get('id')}) has no timestamp.") 137 138 # pseudonymise or anonymise as needed 139 filtering = self.parameters.get("pseudonymise") 140 try: 141 if filtering: 142 for field, value in item.items(): 143 if field is None: 144 # This would normally be caught when writerow is called 145 raise CsvDialectException("Field is None") 146 if field.startswith("author"): 147 if filtering == "anonymise": 148 item[field] = "REDACTED" 149 elif filtering == "pseudonymise": 150 item[field] = hash_cache.update_cache(value) 151 152 writer.writerow(item) 153 except ValueError as e: 154 if not possible_dialects: 155 self.dataset.log(f"Error ({e}) writing item {i}: {item}") 156 return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct " 157 "format or edited the CSV after exporting? Try importing " 158 "as custom format.") 159 else: 160 raise CsvDialectException(f"Error ({e}) writing item {i}: {item}") 161 162 done += 1 163 164 except import_formats.InvalidCustomFormat as e: 165 self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset " 166 "log for details.") 167 infile.close() 168 temp_file.unlink() 169 return self.dataset.finish_with_error(str(e)) 170 171 except UnicodeDecodeError: 172 infile.close() 173 temp_file.unlink() 174 return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. " 175 "Make sure the file is encoded properly and try again.") 176 177 except CsvDialectException: 178 self.dataset.log(f"Error with CSV dialect: {vars(dialect)}") 179 continue 180 181 # done! 182 infile.close() 183 # We successfully read the CSV, no need to try other dialects 184 break 185 186 if skipped or timestamp_missing: 187 error_message = "" 188 if timestamp_missing: 189 error_message += f"{timestamp_missing:,} items had no timestamp" 190 if skipped: 191 error_message += f"{' and ' if timestamp_missing else ''}{skipped:,} items were skipped because they could not be parsed or did not match the expected format" 192 193 self.dataset.update_status( 194 f"CSV file imported, but {error_message}. See dataset log for details.", 195 is_final=True) 196 197 temp_file.unlink() 198 self.dataset.delete_parameter("filename") 199 if skipped and not done: 200 self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing " 201 "the item's timestamp may be in a format that cannot be parsed properly.") 202 else: 203 self.dataset.finish(done) 204 205 def validate_query(query, request, config): 206 """ 207 Validate custom data input 208 209 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 210 some metadata. 211 212 :param dict query: Query parameters, from client-side. 213 :param request: Flask request 214 :param ConfigManager|None config: Configuration reader (context-aware) 215 :return dict: Safe query parameters 216 """ 217 # do we have an uploaded file? 218 if "option-data_upload" not in request.files: 219 raise QueryParametersException("No file was offered for upload.") 220 221 file = request.files["option-data_upload"] 222 if not file: 223 raise QueryParametersException("No file was offered for upload.") 224 225 if query.get("format") not in import_formats.tools: 226 raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}") 227 228 # content_length seems unreliable, so figure out the length by reading 229 # the file... 230 upload_size = 0 231 while True: 232 bit = file.read(1024) 233 if len(bit) == 0: 234 break 235 upload_size += len(bit) 236 237 file.seek(0) 238 encoding = sniff_encoding(file) 239 tool_format = import_formats.tools.get(query.get("format")) 240 241 try: 242 # try reading the file as csv here 243 # never read more than 128 kB (to keep it quick) 244 sample_size = min(upload_size, 128 * 1024) # 128 kB is sent from the frontend at most 245 wrapped_file = io.TextIOWrapper(file, encoding=encoding) 246 sample = wrapped_file.read(sample_size) 247 248 if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"): 249 # this may be intended, or the check may be bad, so allow user to continue 250 raise QueryNeedsExplicitConfirmationException( 251 "The uploaded file does not seem to have a header row. Continue anyway?") 252 253 wrapped_file.seek(0) 254 dialect = csv.Sniffer().sniff(sample, delimiters=",;\t") 255 256 # override the guesses for specific formats if defined so in 257 # import_formats.py 258 for prop in tool_format.get("csv_dialect", {}): 259 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 260 261 except UnicodeDecodeError: 262 raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. " 263 "Save the file in the proper format and try again.") 264 except csv.Error: 265 raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 266 267 # With validated csvs, save as is but make sure the raw file is sorted 268 reader = csv.DictReader(wrapped_file, dialect=dialect) 269 270 # we know that the CSV file is a CSV file now, next verify whether 271 # we know what each column means 272 try: 273 fields = reader.fieldnames 274 except UnicodeDecodeError: 275 raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 276 277 incomplete_mapping = list(tool_format["columns"]) 278 for field in tool_format["columns"]: 279 if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form: 280 incomplete_mapping.remove(field) 281 elif not tool_format.get("allow_user_mapping", False) and field in fields: 282 incomplete_mapping.remove(field) 283 284 # offer the user a number of select boxes where they can indicate the 285 # mapping for each column 286 column_mapping = {} 287 if tool_format.get("allow_user_mapping", False): 288 magic_mappings = { 289 "id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 290 "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 291 "empty": {"__4cat_empty_value": "[empty]"}, 292 "timestamp": {"__4cat_now": "[current date and time]"} 293 } 294 if incomplete_mapping: 295 raise QueryNeedsFurtherInputException({ 296 "mapping-info": { 297 "type": UserInput.OPTION_INFO, 298 "help": "Please confirm which column in the CSV file maps to each required value." 299 }, 300 **{ 301 "mapping-%s" % mappable_column: { 302 "type": UserInput.OPTION_CHOICE, 303 "options": { 304 "": "", 305 **magic_mappings.get(mappable_column, magic_mappings["empty"]), 306 **{column: column for column in fields} 307 }, 308 "default": mappable_column if mappable_column in fields else "", 309 "help": mappable_column, 310 "tooltip": tool_format["columns"][mappable_column] 311 } for mappable_column in incomplete_mapping 312 }}) 313 314 # the mappings do need to point to a column in the csv file 315 missing_mapping = [] 316 for field in tool_format["columns"]: 317 mapping_field = "option-mapping-%s" % field 318 provided_field = request.form.get(mapping_field) 319 if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field: 320 missing_mapping.append(field) 321 else: 322 column_mapping["mapping-" + field] = request.form.get(mapping_field) 323 324 if missing_mapping: 325 raise QueryParametersException( 326 "You need to indicate which column in the CSV file holds the corresponding value for the following " 327 "columns: %s" % ", ".join(missing_mapping)) 328 329 elif incomplete_mapping: 330 raise QueryParametersException("The CSV file does not contain all required columns. The following columns " 331 "are missing: %s" % ", ".join(incomplete_mapping)) 332 333 # the timestamp column needs to be parseable 334 timestamp_column = request.form.get("mapping-timestamp") 335 try: 336 row = reader.__next__() 337 if timestamp_column not in row: 338 # incomplete row because we are analysing a sample 339 # stop parsing because no complete rows will follow 340 raise StopIteration 341 342 if row[timestamp_column]: 343 try: 344 if row[timestamp_column].isdecimal(): 345 datetime.fromtimestamp(float(row[timestamp_column])) 346 else: 347 parse_datetime(row[timestamp_column]) 348 except (ValueError, OSError): 349 raise QueryParametersException( 350 "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)") 351 else: 352 # the timestamp column is empty or contains empty values 353 if not query.get("frontend-confirm"): 354 # TODO: THIS never triggers! frontend-confirm is already set when columns are mapped 355 # TODO: frontend-confirm exceptions need to be made unique 356 raise QueryNeedsExplicitConfirmationException( 357 "Your 'timestamp' column contains empty values. Continue anyway?") 358 else: 359 # `None` value will be used 360 pass 361 362 except StopIteration: 363 pass 364 365 # ok, we're done with the file 366 wrapped_file.detach() 367 368 # Whether to strip the HTML tags 369 strip_html = False 370 if query.get("strip_html"): 371 strip_html = True 372 373 # return metadata - the filename is sanitised and serves no purpose at 374 # this point in time, but can be used to uniquely identify a dataset 375 disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]") 376 return { 377 "filename": disallowed_characters.sub("", file.filename), 378 "time": time.time(), 379 "datasource": "upload", 380 "board": query.get("format", "custom").replace("_", "-"), 381 "format": query.get("format"), 382 "strip_html": strip_html, 383 **column_mapping, 384 } 385 386 def after_create(query, dataset, request): 387 """ 388 Hook to execute after the dataset for this source has been created 389 390 In this case, put the file in a temporary location so it can be 391 processed properly by the related Job later. 392 393 :param dict query: Sanitised query parameters 394 :param DataSet dataset: Dataset created for this query 395 :param request: Flask request submitted for its creation 396 """ 397 file = request.files["option-data_upload"] 398 file.seek(0) 399 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 400 while True: 401 chunk = file.read(1024) 402 if len(chunk) == 0: 403 break 404 outfile.write(chunk)
Abstract processor class
A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).
To determine whether a processor can process a given dataset, you can
define a is_compatible_with(FourcatModule module=None, config=None):) -> bool
class
method which takes a dataset as argument and returns a bool that determines
if this processor is considered compatible with that dataset. For example:
@classmethod
def is_compatible_with(cls, module=None, config=None):
return module.type == "linguistic-features"
63 def process(self): 64 """ 65 Process uploaded CSV file 66 67 Applies the provided mapping and makes sure the file is in a format 68 4CAT will understand. 69 """ 70 tool_format = import_formats.tools.get(self.parameters.get("format")) 71 temp_file = self.dataset.get_results_path().with_suffix(".importing") 72 with temp_file.open("rb") as infile: 73 # detect encoding - UTF-8 with or without BOM 74 encoding = sniff_encoding(infile) 75 76 # figure out the csv dialect 77 # the sniffer is not perfect and sometimes makes mistakes 78 # for some formats we already know the dialect, so we can override its 79 # guess and set the properties as defined in import_formats.py 80 infile = temp_file.open("r", encoding=encoding) 81 sample = infile.read(1024 * 1024) 82 try: 83 possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))] 84 except csv.Error: 85 possible_dialects = csv.list_dialects() 86 if tool_format.get("csv_dialect", {}): 87 # Known dialects are defined in import_formats.py 88 dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t")) 89 for prop in tool_format.get("csv_dialect", {}): 90 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 91 possible_dialects.append(dialect) 92 93 while possible_dialects: 94 # With validated csvs, save as is but make sure the raw file is sorted 95 infile.seek(0) 96 dialect = possible_dialects.pop() # Use the last dialect first 97 self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) is csv.Dialect else dialect}") 98 reader = csv.DictReader(infile, dialect=dialect) 99 100 if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \ 101 set(tool_format["columns"]) != set(tool_format["columns"]): 102 raise QueryParametersException("Not all columns are present") 103 104 # hasher for pseudonymisation 105 salt = secrets.token_bytes(16) 106 hasher = hashlib.blake2b(digest_size=24, salt=salt) 107 hash_cache = HashCache(hasher) 108 109 # write the resulting dataset 110 writer = None 111 done = 0 112 skipped = 0 113 timestamp_missing = 0 114 with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv: 115 # mapper is defined in import_formats 116 try: 117 for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)): 118 if isinstance(item, import_formats.InvalidImportedItem): 119 # if the mapper returns this class, the item is not written 120 skipped += 1 121 if hasattr(item, "reason"): 122 self.dataset.log(f"Skipping item ({item.reason})") 123 continue 124 125 if not writer: 126 writer = csv.DictWriter(output_csv, fieldnames=list(item.keys())) 127 writer.writeheader() 128 129 if self.parameters.get("strip_html") and "body" in item: 130 item["body"] = strip_tags(item["body"]) 131 132 # check for None/empty timestamp 133 if not item.get("timestamp"): 134 # Notify the user that items are missing a timestamp 135 timestamp_missing += 1 136 self.dataset.log(f"Item {i} ({item.get('id')}) has no timestamp.") 137 138 # pseudonymise or anonymise as needed 139 filtering = self.parameters.get("pseudonymise") 140 try: 141 if filtering: 142 for field, value in item.items(): 143 if field is None: 144 # This would normally be caught when writerow is called 145 raise CsvDialectException("Field is None") 146 if field.startswith("author"): 147 if filtering == "anonymise": 148 item[field] = "REDACTED" 149 elif filtering == "pseudonymise": 150 item[field] = hash_cache.update_cache(value) 151 152 writer.writerow(item) 153 except ValueError as e: 154 if not possible_dialects: 155 self.dataset.log(f"Error ({e}) writing item {i}: {item}") 156 return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct " 157 "format or edited the CSV after exporting? Try importing " 158 "as custom format.") 159 else: 160 raise CsvDialectException(f"Error ({e}) writing item {i}: {item}") 161 162 done += 1 163 164 except import_formats.InvalidCustomFormat as e: 165 self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset " 166 "log for details.") 167 infile.close() 168 temp_file.unlink() 169 return self.dataset.finish_with_error(str(e)) 170 171 except UnicodeDecodeError: 172 infile.close() 173 temp_file.unlink() 174 return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. " 175 "Make sure the file is encoded properly and try again.") 176 177 except CsvDialectException: 178 self.dataset.log(f"Error with CSV dialect: {vars(dialect)}") 179 continue 180 181 # done! 182 infile.close() 183 # We successfully read the CSV, no need to try other dialects 184 break 185 186 if skipped or timestamp_missing: 187 error_message = "" 188 if timestamp_missing: 189 error_message += f"{timestamp_missing:,} items had no timestamp" 190 if skipped: 191 error_message += f"{' and ' if timestamp_missing else ''}{skipped:,} items were skipped because they could not be parsed or did not match the expected format" 192 193 self.dataset.update_status( 194 f"CSV file imported, but {error_message}. See dataset log for details.", 195 is_final=True) 196 197 temp_file.unlink() 198 self.dataset.delete_parameter("filename") 199 if skipped and not done: 200 self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing " 201 "the item's timestamp may be in a format that cannot be parsed properly.") 202 else: 203 self.dataset.finish(done)
Process uploaded CSV file
Applies the provided mapping and makes sure the file is in a format 4CAT will understand.
205 def validate_query(query, request, config): 206 """ 207 Validate custom data input 208 209 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 210 some metadata. 211 212 :param dict query: Query parameters, from client-side. 213 :param request: Flask request 214 :param ConfigManager|None config: Configuration reader (context-aware) 215 :return dict: Safe query parameters 216 """ 217 # do we have an uploaded file? 218 if "option-data_upload" not in request.files: 219 raise QueryParametersException("No file was offered for upload.") 220 221 file = request.files["option-data_upload"] 222 if not file: 223 raise QueryParametersException("No file was offered for upload.") 224 225 if query.get("format") not in import_formats.tools: 226 raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}") 227 228 # content_length seems unreliable, so figure out the length by reading 229 # the file... 230 upload_size = 0 231 while True: 232 bit = file.read(1024) 233 if len(bit) == 0: 234 break 235 upload_size += len(bit) 236 237 file.seek(0) 238 encoding = sniff_encoding(file) 239 tool_format = import_formats.tools.get(query.get("format")) 240 241 try: 242 # try reading the file as csv here 243 # never read more than 128 kB (to keep it quick) 244 sample_size = min(upload_size, 128 * 1024) # 128 kB is sent from the frontend at most 245 wrapped_file = io.TextIOWrapper(file, encoding=encoding) 246 sample = wrapped_file.read(sample_size) 247 248 if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"): 249 # this may be intended, or the check may be bad, so allow user to continue 250 raise QueryNeedsExplicitConfirmationException( 251 "The uploaded file does not seem to have a header row. Continue anyway?") 252 253 wrapped_file.seek(0) 254 dialect = csv.Sniffer().sniff(sample, delimiters=",;\t") 255 256 # override the guesses for specific formats if defined so in 257 # import_formats.py 258 for prop in tool_format.get("csv_dialect", {}): 259 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 260 261 except UnicodeDecodeError: 262 raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. " 263 "Save the file in the proper format and try again.") 264 except csv.Error: 265 raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 266 267 # With validated csvs, save as is but make sure the raw file is sorted 268 reader = csv.DictReader(wrapped_file, dialect=dialect) 269 270 # we know that the CSV file is a CSV file now, next verify whether 271 # we know what each column means 272 try: 273 fields = reader.fieldnames 274 except UnicodeDecodeError: 275 raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 276 277 incomplete_mapping = list(tool_format["columns"]) 278 for field in tool_format["columns"]: 279 if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form: 280 incomplete_mapping.remove(field) 281 elif not tool_format.get("allow_user_mapping", False) and field in fields: 282 incomplete_mapping.remove(field) 283 284 # offer the user a number of select boxes where they can indicate the 285 # mapping for each column 286 column_mapping = {} 287 if tool_format.get("allow_user_mapping", False): 288 magic_mappings = { 289 "id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 290 "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 291 "empty": {"__4cat_empty_value": "[empty]"}, 292 "timestamp": {"__4cat_now": "[current date and time]"} 293 } 294 if incomplete_mapping: 295 raise QueryNeedsFurtherInputException({ 296 "mapping-info": { 297 "type": UserInput.OPTION_INFO, 298 "help": "Please confirm which column in the CSV file maps to each required value." 299 }, 300 **{ 301 "mapping-%s" % mappable_column: { 302 "type": UserInput.OPTION_CHOICE, 303 "options": { 304 "": "", 305 **magic_mappings.get(mappable_column, magic_mappings["empty"]), 306 **{column: column for column in fields} 307 }, 308 "default": mappable_column if mappable_column in fields else "", 309 "help": mappable_column, 310 "tooltip": tool_format["columns"][mappable_column] 311 } for mappable_column in incomplete_mapping 312 }}) 313 314 # the mappings do need to point to a column in the csv file 315 missing_mapping = [] 316 for field in tool_format["columns"]: 317 mapping_field = "option-mapping-%s" % field 318 provided_field = request.form.get(mapping_field) 319 if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field: 320 missing_mapping.append(field) 321 else: 322 column_mapping["mapping-" + field] = request.form.get(mapping_field) 323 324 if missing_mapping: 325 raise QueryParametersException( 326 "You need to indicate which column in the CSV file holds the corresponding value for the following " 327 "columns: %s" % ", ".join(missing_mapping)) 328 329 elif incomplete_mapping: 330 raise QueryParametersException("The CSV file does not contain all required columns. The following columns " 331 "are missing: %s" % ", ".join(incomplete_mapping)) 332 333 # the timestamp column needs to be parseable 334 timestamp_column = request.form.get("mapping-timestamp") 335 try: 336 row = reader.__next__() 337 if timestamp_column not in row: 338 # incomplete row because we are analysing a sample 339 # stop parsing because no complete rows will follow 340 raise StopIteration 341 342 if row[timestamp_column]: 343 try: 344 if row[timestamp_column].isdecimal(): 345 datetime.fromtimestamp(float(row[timestamp_column])) 346 else: 347 parse_datetime(row[timestamp_column]) 348 except (ValueError, OSError): 349 raise QueryParametersException( 350 "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)") 351 else: 352 # the timestamp column is empty or contains empty values 353 if not query.get("frontend-confirm"): 354 # TODO: THIS never triggers! frontend-confirm is already set when columns are mapped 355 # TODO: frontend-confirm exceptions need to be made unique 356 raise QueryNeedsExplicitConfirmationException( 357 "Your 'timestamp' column contains empty values. Continue anyway?") 358 else: 359 # `None` value will be used 360 pass 361 362 except StopIteration: 363 pass 364 365 # ok, we're done with the file 366 wrapped_file.detach() 367 368 # Whether to strip the HTML tags 369 strip_html = False 370 if query.get("strip_html"): 371 strip_html = True 372 373 # return metadata - the filename is sanitised and serves no purpose at 374 # this point in time, but can be used to uniquely identify a dataset 375 disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]") 376 return { 377 "filename": disallowed_characters.sub("", file.filename), 378 "time": time.time(), 379 "datasource": "upload", 380 "board": query.get("format", "custom").replace("_", "-"), 381 "format": query.get("format"), 382 "strip_html": strip_html, 383 **column_mapping, 384 }
Validate custom data input
Confirms that the uploaded file is a valid CSV or tab file and, if so, returns some metadata.
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- ConfigManager|None config: Configuration reader (context-aware)
Returns
Safe query parameters
386 def after_create(query, dataset, request): 387 """ 388 Hook to execute after the dataset for this source has been created 389 390 In this case, put the file in a temporary location so it can be 391 processed properly by the related Job later. 392 393 :param dict query: Sanitised query parameters 394 :param DataSet dataset: Dataset created for this query 395 :param request: Flask request submitted for its creation 396 """ 397 file = request.files["option-data_upload"] 398 file.seek(0) 399 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 400 while True: 401 chunk = file.read(1024) 402 if len(chunk) == 0: 403 break 404 outfile.write(chunk)
Hook to execute after the dataset for this source has been created
In this case, put the file in a temporary location so it can be processed properly by the related Job later.
Parameters
- dict query: Sanitised query parameters
- DataSet dataset: Dataset created for this query
- request: Flask request submitted for its creation
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- config
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- iterate_proxied_requests
- push_proxied_request
- flush_proxied_requests
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- save_annotations
- map_item_method_available
- get_mapped_item
- is_filter
- get_options
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor