datasources.upload.import_csv
Custom data upload to create bespoke datasets
1""" 2Custom data upload to create bespoke datasets 3""" 4import secrets 5import hashlib 6import time 7import csv 8import re 9import io 10 11import datasources.upload.import_formats as import_formats 12 13from dateutil.parser import parse as parse_datetime 14from datetime import datetime 15 16from backend.lib.processor import BasicProcessor 17from common.lib.exceptions import QueryParametersException, QueryNeedsFurtherInputException, \ 18 QueryNeedsExplicitConfirmationException, CsvDialectException 19from common.lib.helpers import strip_tags, sniff_encoding, UserInput, HashCache 20 21 22class SearchCustom(BasicProcessor): 23 type = "upload-search" # job ID 24 category = "Search" # category 25 title = "Custom Dataset Upload" # title displayed in UI 26 description = "Upload your own CSV file to be used as a dataset" # description displayed in UI 27 extension = "csv" # extension of result file, used internally and in UI 28 is_local = False # Whether this datasource is locally scraped 29 is_static = False # Whether this datasource is still updated 30 31 max_workers = 1 32 options = { 33 "intro": { 34 "type": UserInput.OPTION_INFO, 35 "help": "You can upload a CSV or TAB file here that, after upload, will be available for further analysis " 36 "and processing. Files need to be [UTF-8](https://en.wikipedia.org/wiki/UTF-8)-encoded and must " 37 "contain a header row.\n\n" 38 "You can indicate what format the file has or upload one with arbitrary structure. In the latter " 39 "case, for each item, columns describing its ID, author, timestamp, and content are expected. You " 40 "can select which column holds which value after uploading the file." 41 }, 42 "data_upload": { 43 "type": UserInput.OPTION_FILE, 44 "help": "File" 45 }, 46 "format": { 47 "type": UserInput.OPTION_CHOICE, 48 "help": "CSV format", 49 "options": { 50 tool: info["name"] for tool, info in import_formats.tools.items() 51 }, 52 "default": "custom" 53 }, 54 "strip_html": { 55 "type": UserInput.OPTION_TOGGLE, 56 "help": "Strip HTML?", 57 "default": False, 58 "tooltip": "Removes HTML tags from the column identified as containing the item content ('body' by default)" 59 } 60 } 61 62 def process(self): 63 """ 64 Process uploaded CSV file 65 66 Applies the provided mapping and makes sure the file is in a format 67 4CAT will understand. 68 """ 69 tool_format = import_formats.tools.get(self.parameters.get("format")) 70 temp_file = self.dataset.get_results_path().with_suffix(".importing") 71 with temp_file.open("rb") as infile: 72 # detect encoding - UTF-8 with or without BOM 73 encoding = sniff_encoding(infile) 74 75 # figure out the csv dialect 76 # the sniffer is not perfect and sometimes makes mistakes 77 # for some formats we already know the dialect, so we can override its 78 # guess and set the properties as defined in import_formats.py 79 infile = temp_file.open("r", encoding=encoding) 80 sample = infile.read(1024 * 1024) 81 try: 82 possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))] 83 except csv.Error: 84 possible_dialects = csv.list_dialects() 85 if tool_format.get("csv_dialect", {}): 86 # Known dialects are defined in import_formats.py 87 dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t")) 88 for prop in tool_format.get("csv_dialect", {}): 89 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 90 possible_dialects.append(dialect) 91 92 while possible_dialects: 93 # With validated csvs, save as is but make sure the raw file is sorted 94 infile.seek(0) 95 dialect = possible_dialects.pop() # Use the last dialect first 96 self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) == csv.Dialect else dialect}") 97 reader = csv.DictReader(infile, dialect=dialect) 98 99 if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \ 100 set(tool_format["columns"]) != set(tool_format["columns"]): 101 raise QueryParametersException("Not all columns are present") 102 103 # hasher for pseudonymisation 104 salt = secrets.token_bytes(16) 105 hasher = hashlib.blake2b(digest_size=24, salt=salt) 106 hash_cache = HashCache(hasher) 107 108 # write the resulting dataset 109 writer = None 110 done = 0 111 skipped = 0 112 with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv: 113 # mapper is defined in import_formats 114 try: 115 for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)): 116 if isinstance(item, import_formats.InvalidImportedItem): 117 # if the mapper returns this class, the item is not written 118 skipped += 1 119 if hasattr(item, "reason"): 120 self.dataset.log(f"Skipping item ({item.reason})") 121 continue 122 123 if not writer: 124 writer = csv.DictWriter(output_csv, fieldnames=list(item.keys())) 125 writer.writeheader() 126 127 if self.parameters.get("strip_html") and "body" in item: 128 item["body"] = strip_tags(item["body"]) 129 130 # pseudonymise or anonymise as needed 131 filtering = self.parameters.get("pseudonymise") 132 try: 133 if filtering: 134 for field, value in item.items(): 135 if field is None: 136 # This would normally be caught when writerow is called 137 raise CsvDialectException("Field is None") 138 if field.startswith("author"): 139 if filtering == "anonymise": 140 item[field] = "REDACTED" 141 elif filtering == "pseudonymise": 142 item[field] = hash_cache.update_cache(value) 143 144 writer.writerow(item) 145 except ValueError as e: 146 if not possible_dialects: 147 self.dataset.log(f"Error ({e}) writing item {i}: {item}") 148 return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct " 149 "format or edited the CSV after exporting? Try importing " 150 "as custom format.") 151 else: 152 raise CsvDialectException(f"Error ({e}) writing item {i}: {item}") 153 154 done += 1 155 156 except import_formats.InvalidCustomFormat as e: 157 self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset " 158 "log for details.") 159 infile.close() 160 temp_file.unlink() 161 return self.dataset.finish_with_error(str(e)) 162 163 except UnicodeDecodeError as e: 164 infile.close() 165 temp_file.unlink() 166 return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. " 167 "Make sure the file is encoded properly and try again.") 168 169 except CsvDialectException as e: 170 self.dataset.log(f"Error with CSV dialect: {vars(dialect)}") 171 continue 172 173 # done! 174 infile.close() 175 # We successfully read the CSV, no need to try other dialects 176 break 177 178 if skipped: 179 self.dataset.update_status( 180 f"CSV file imported, but {skipped:,} items were skipped because their date could not be parsed.", 181 is_final=True) 182 183 temp_file.unlink() 184 self.dataset.delete_parameter("filename") 185 if skipped and not done: 186 self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing " 187 "the item's timestamp may be in a format that cannot be parsed properly.") 188 else: 189 self.dataset.finish(done) 190 191 def validate_query(query, request, user): 192 """ 193 Validate custom data input 194 195 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 196 some metadata. 197 198 :param dict query: Query parameters, from client-side. 199 :param request: Flask request 200 :param User user: User object of user who has submitted the query 201 :return dict: Safe query parameters 202 """ 203 # do we have an uploaded file? 204 if "option-data_upload" not in request.files: 205 raise QueryParametersException("No file was offered for upload.") 206 207 file = request.files["option-data_upload"] 208 if not file: 209 raise QueryParametersException("No file was offered for upload.") 210 211 if query.get("format") not in import_formats.tools: 212 raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}") 213 214 # content_length seems unreliable, so figure out the length by reading 215 # the file... 216 upload_size = 0 217 while True: 218 bit = file.read(1024) 219 if len(bit) == 0: 220 break 221 upload_size += len(bit) 222 223 file.seek(0) 224 encoding = sniff_encoding(file) 225 tool_format = import_formats.tools.get(query.get("format")) 226 227 try: 228 # try reading the file as csv here 229 # never read more than 128 kB (to keep it quick) 230 sample_size = min(upload_size, 128 * 1024) # 128 kB is sent from the frontend at most 231 wrapped_file = io.TextIOWrapper(file, encoding=encoding) 232 sample = wrapped_file.read(sample_size) 233 234 if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"): 235 # this may be intended, or the check may be bad, so allow user to continue 236 raise QueryNeedsExplicitConfirmationException( 237 "The uploaded file does not seem to have a header row. Continue anyway?") 238 239 wrapped_file.seek(0) 240 dialect = csv.Sniffer().sniff(sample, delimiters=",;\t") 241 242 # override the guesses for specific formats if defined so in 243 # import_formats.py 244 for prop in tool_format.get("csv_dialect", {}): 245 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 246 247 except UnicodeDecodeError as e: 248 raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. " 249 "Save the file in the proper format and try again.") 250 except csv.Error: 251 raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 252 253 # With validated csvs, save as is but make sure the raw file is sorted 254 reader = csv.DictReader(wrapped_file, dialect=dialect) 255 256 # we know that the CSV file is a CSV file now, next verify whether 257 # we know what each column means 258 try: 259 fields = reader.fieldnames 260 except UnicodeDecodeError: 261 raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 262 263 incomplete_mapping = list(tool_format["columns"]) 264 for field in tool_format["columns"]: 265 if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form: 266 incomplete_mapping.remove(field) 267 elif not tool_format.get("allow_user_mapping", False) and field in fields: 268 incomplete_mapping.remove(field) 269 270 # offer the user a number of select boxes where they can indicate the 271 # mapping for each column 272 column_mapping = {} 273 if tool_format.get("allow_user_mapping", False): 274 magic_mappings = { 275 "id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 276 "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 277 "empty": {"__4cat_empty_value": "[empty]"}, 278 "timestamp": {"__4cat_now": "[current date and time]"} 279 } 280 if incomplete_mapping: 281 raise QueryNeedsFurtherInputException({ 282 "mapping-info": { 283 "type": UserInput.OPTION_INFO, 284 "help": "Please confirm which column in the CSV file maps to each required value." 285 }, 286 **{ 287 "mapping-%s" % mappable_column: { 288 "type": UserInput.OPTION_CHOICE, 289 "options": { 290 "": "", 291 **magic_mappings.get(mappable_column, magic_mappings["empty"]), 292 **{column: column for column in fields} 293 }, 294 "default": mappable_column if mappable_column in fields else "", 295 "help": mappable_column, 296 "tooltip": tool_format["columns"][mappable_column] 297 } for mappable_column in incomplete_mapping 298 }}) 299 300 # the mappings do need to point to a column in the csv file 301 missing_mapping = [] 302 for field in tool_format["columns"]: 303 mapping_field = "option-mapping-%s" % field 304 provided_field = request.form.get(mapping_field) 305 if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field: 306 missing_mapping.append(field) 307 else: 308 column_mapping["mapping-" + field] = request.form.get(mapping_field) 309 310 if missing_mapping: 311 raise QueryParametersException( 312 "You need to indicate which column in the CSV file holds the corresponding value for the following " 313 "columns: %s" % ", ".join(missing_mapping)) 314 315 elif incomplete_mapping: 316 raise QueryParametersException("The CSV file does not contain all required columns. The following columns " 317 "are missing: %s" % ", ".join(incomplete_mapping)) 318 319 # the timestamp column needs to be parseable 320 timestamp_column = request.form.get("mapping-timestamp") 321 try: 322 row = reader.__next__() 323 if timestamp_column not in row: 324 # incomplete row because we are analysing a sample 325 # stop parsing because no complete rows will follow 326 raise StopIteration 327 328 try: 329 if row[timestamp_column].isdecimal(): 330 datetime.fromtimestamp(float(row[timestamp_column])) 331 else: 332 parse_datetime(row[timestamp_column]) 333 except (ValueError, OSError): 334 raise QueryParametersException( 335 "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)") 336 337 except StopIteration: 338 pass 339 340 # ok, we're done with the file 341 wrapped_file.detach() 342 343 # Whether to strip the HTML tags 344 strip_html = False 345 if query.get("strip_html"): 346 strip_html = True 347 348 # return metadata - the filename is sanitised and serves no purpose at 349 # this point in time, but can be used to uniquely identify a dataset 350 disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]") 351 return { 352 "filename": disallowed_characters.sub("", file.filename), 353 "time": time.time(), 354 "datasource": "upload", 355 "board": query.get("format", "custom").replace("_", "-"), 356 "format": query.get("format"), 357 "strip_html": strip_html, 358 **column_mapping, 359 } 360 361 def after_create(query, dataset, request): 362 """ 363 Hook to execute after the dataset for this source has been created 364 365 In this case, put the file in a temporary location so it can be 366 processed properly by the related Job later. 367 368 :param dict query: Sanitised query parameters 369 :param DataSet dataset: Dataset created for this query 370 :param request: Flask request submitted for its creation 371 """ 372 file = request.files["option-data_upload"] 373 file.seek(0) 374 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 375 while True: 376 chunk = file.read(1024) 377 if len(chunk) == 0: 378 break 379 outfile.write(chunk)
23class SearchCustom(BasicProcessor): 24 type = "upload-search" # job ID 25 category = "Search" # category 26 title = "Custom Dataset Upload" # title displayed in UI 27 description = "Upload your own CSV file to be used as a dataset" # description displayed in UI 28 extension = "csv" # extension of result file, used internally and in UI 29 is_local = False # Whether this datasource is locally scraped 30 is_static = False # Whether this datasource is still updated 31 32 max_workers = 1 33 options = { 34 "intro": { 35 "type": UserInput.OPTION_INFO, 36 "help": "You can upload a CSV or TAB file here that, after upload, will be available for further analysis " 37 "and processing. Files need to be [UTF-8](https://en.wikipedia.org/wiki/UTF-8)-encoded and must " 38 "contain a header row.\n\n" 39 "You can indicate what format the file has or upload one with arbitrary structure. In the latter " 40 "case, for each item, columns describing its ID, author, timestamp, and content are expected. You " 41 "can select which column holds which value after uploading the file." 42 }, 43 "data_upload": { 44 "type": UserInput.OPTION_FILE, 45 "help": "File" 46 }, 47 "format": { 48 "type": UserInput.OPTION_CHOICE, 49 "help": "CSV format", 50 "options": { 51 tool: info["name"] for tool, info in import_formats.tools.items() 52 }, 53 "default": "custom" 54 }, 55 "strip_html": { 56 "type": UserInput.OPTION_TOGGLE, 57 "help": "Strip HTML?", 58 "default": False, 59 "tooltip": "Removes HTML tags from the column identified as containing the item content ('body' by default)" 60 } 61 } 62 63 def process(self): 64 """ 65 Process uploaded CSV file 66 67 Applies the provided mapping and makes sure the file is in a format 68 4CAT will understand. 69 """ 70 tool_format = import_formats.tools.get(self.parameters.get("format")) 71 temp_file = self.dataset.get_results_path().with_suffix(".importing") 72 with temp_file.open("rb") as infile: 73 # detect encoding - UTF-8 with or without BOM 74 encoding = sniff_encoding(infile) 75 76 # figure out the csv dialect 77 # the sniffer is not perfect and sometimes makes mistakes 78 # for some formats we already know the dialect, so we can override its 79 # guess and set the properties as defined in import_formats.py 80 infile = temp_file.open("r", encoding=encoding) 81 sample = infile.read(1024 * 1024) 82 try: 83 possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))] 84 except csv.Error: 85 possible_dialects = csv.list_dialects() 86 if tool_format.get("csv_dialect", {}): 87 # Known dialects are defined in import_formats.py 88 dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t")) 89 for prop in tool_format.get("csv_dialect", {}): 90 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 91 possible_dialects.append(dialect) 92 93 while possible_dialects: 94 # With validated csvs, save as is but make sure the raw file is sorted 95 infile.seek(0) 96 dialect = possible_dialects.pop() # Use the last dialect first 97 self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) == csv.Dialect else dialect}") 98 reader = csv.DictReader(infile, dialect=dialect) 99 100 if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \ 101 set(tool_format["columns"]) != set(tool_format["columns"]): 102 raise QueryParametersException("Not all columns are present") 103 104 # hasher for pseudonymisation 105 salt = secrets.token_bytes(16) 106 hasher = hashlib.blake2b(digest_size=24, salt=salt) 107 hash_cache = HashCache(hasher) 108 109 # write the resulting dataset 110 writer = None 111 done = 0 112 skipped = 0 113 with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv: 114 # mapper is defined in import_formats 115 try: 116 for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)): 117 if isinstance(item, import_formats.InvalidImportedItem): 118 # if the mapper returns this class, the item is not written 119 skipped += 1 120 if hasattr(item, "reason"): 121 self.dataset.log(f"Skipping item ({item.reason})") 122 continue 123 124 if not writer: 125 writer = csv.DictWriter(output_csv, fieldnames=list(item.keys())) 126 writer.writeheader() 127 128 if self.parameters.get("strip_html") and "body" in item: 129 item["body"] = strip_tags(item["body"]) 130 131 # pseudonymise or anonymise as needed 132 filtering = self.parameters.get("pseudonymise") 133 try: 134 if filtering: 135 for field, value in item.items(): 136 if field is None: 137 # This would normally be caught when writerow is called 138 raise CsvDialectException("Field is None") 139 if field.startswith("author"): 140 if filtering == "anonymise": 141 item[field] = "REDACTED" 142 elif filtering == "pseudonymise": 143 item[field] = hash_cache.update_cache(value) 144 145 writer.writerow(item) 146 except ValueError as e: 147 if not possible_dialects: 148 self.dataset.log(f"Error ({e}) writing item {i}: {item}") 149 return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct " 150 "format or edited the CSV after exporting? Try importing " 151 "as custom format.") 152 else: 153 raise CsvDialectException(f"Error ({e}) writing item {i}: {item}") 154 155 done += 1 156 157 except import_formats.InvalidCustomFormat as e: 158 self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset " 159 "log for details.") 160 infile.close() 161 temp_file.unlink() 162 return self.dataset.finish_with_error(str(e)) 163 164 except UnicodeDecodeError as e: 165 infile.close() 166 temp_file.unlink() 167 return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. " 168 "Make sure the file is encoded properly and try again.") 169 170 except CsvDialectException as e: 171 self.dataset.log(f"Error with CSV dialect: {vars(dialect)}") 172 continue 173 174 # done! 175 infile.close() 176 # We successfully read the CSV, no need to try other dialects 177 break 178 179 if skipped: 180 self.dataset.update_status( 181 f"CSV file imported, but {skipped:,} items were skipped because their date could not be parsed.", 182 is_final=True) 183 184 temp_file.unlink() 185 self.dataset.delete_parameter("filename") 186 if skipped and not done: 187 self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing " 188 "the item's timestamp may be in a format that cannot be parsed properly.") 189 else: 190 self.dataset.finish(done) 191 192 def validate_query(query, request, user): 193 """ 194 Validate custom data input 195 196 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 197 some metadata. 198 199 :param dict query: Query parameters, from client-side. 200 :param request: Flask request 201 :param User user: User object of user who has submitted the query 202 :return dict: Safe query parameters 203 """ 204 # do we have an uploaded file? 205 if "option-data_upload" not in request.files: 206 raise QueryParametersException("No file was offered for upload.") 207 208 file = request.files["option-data_upload"] 209 if not file: 210 raise QueryParametersException("No file was offered for upload.") 211 212 if query.get("format") not in import_formats.tools: 213 raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}") 214 215 # content_length seems unreliable, so figure out the length by reading 216 # the file... 217 upload_size = 0 218 while True: 219 bit = file.read(1024) 220 if len(bit) == 0: 221 break 222 upload_size += len(bit) 223 224 file.seek(0) 225 encoding = sniff_encoding(file) 226 tool_format = import_formats.tools.get(query.get("format")) 227 228 try: 229 # try reading the file as csv here 230 # never read more than 128 kB (to keep it quick) 231 sample_size = min(upload_size, 128 * 1024) # 128 kB is sent from the frontend at most 232 wrapped_file = io.TextIOWrapper(file, encoding=encoding) 233 sample = wrapped_file.read(sample_size) 234 235 if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"): 236 # this may be intended, or the check may be bad, so allow user to continue 237 raise QueryNeedsExplicitConfirmationException( 238 "The uploaded file does not seem to have a header row. Continue anyway?") 239 240 wrapped_file.seek(0) 241 dialect = csv.Sniffer().sniff(sample, delimiters=",;\t") 242 243 # override the guesses for specific formats if defined so in 244 # import_formats.py 245 for prop in tool_format.get("csv_dialect", {}): 246 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 247 248 except UnicodeDecodeError as e: 249 raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. " 250 "Save the file in the proper format and try again.") 251 except csv.Error: 252 raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 253 254 # With validated csvs, save as is but make sure the raw file is sorted 255 reader = csv.DictReader(wrapped_file, dialect=dialect) 256 257 # we know that the CSV file is a CSV file now, next verify whether 258 # we know what each column means 259 try: 260 fields = reader.fieldnames 261 except UnicodeDecodeError: 262 raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 263 264 incomplete_mapping = list(tool_format["columns"]) 265 for field in tool_format["columns"]: 266 if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form: 267 incomplete_mapping.remove(field) 268 elif not tool_format.get("allow_user_mapping", False) and field in fields: 269 incomplete_mapping.remove(field) 270 271 # offer the user a number of select boxes where they can indicate the 272 # mapping for each column 273 column_mapping = {} 274 if tool_format.get("allow_user_mapping", False): 275 magic_mappings = { 276 "id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 277 "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 278 "empty": {"__4cat_empty_value": "[empty]"}, 279 "timestamp": {"__4cat_now": "[current date and time]"} 280 } 281 if incomplete_mapping: 282 raise QueryNeedsFurtherInputException({ 283 "mapping-info": { 284 "type": UserInput.OPTION_INFO, 285 "help": "Please confirm which column in the CSV file maps to each required value." 286 }, 287 **{ 288 "mapping-%s" % mappable_column: { 289 "type": UserInput.OPTION_CHOICE, 290 "options": { 291 "": "", 292 **magic_mappings.get(mappable_column, magic_mappings["empty"]), 293 **{column: column for column in fields} 294 }, 295 "default": mappable_column if mappable_column in fields else "", 296 "help": mappable_column, 297 "tooltip": tool_format["columns"][mappable_column] 298 } for mappable_column in incomplete_mapping 299 }}) 300 301 # the mappings do need to point to a column in the csv file 302 missing_mapping = [] 303 for field in tool_format["columns"]: 304 mapping_field = "option-mapping-%s" % field 305 provided_field = request.form.get(mapping_field) 306 if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field: 307 missing_mapping.append(field) 308 else: 309 column_mapping["mapping-" + field] = request.form.get(mapping_field) 310 311 if missing_mapping: 312 raise QueryParametersException( 313 "You need to indicate which column in the CSV file holds the corresponding value for the following " 314 "columns: %s" % ", ".join(missing_mapping)) 315 316 elif incomplete_mapping: 317 raise QueryParametersException("The CSV file does not contain all required columns. The following columns " 318 "are missing: %s" % ", ".join(incomplete_mapping)) 319 320 # the timestamp column needs to be parseable 321 timestamp_column = request.form.get("mapping-timestamp") 322 try: 323 row = reader.__next__() 324 if timestamp_column not in row: 325 # incomplete row because we are analysing a sample 326 # stop parsing because no complete rows will follow 327 raise StopIteration 328 329 try: 330 if row[timestamp_column].isdecimal(): 331 datetime.fromtimestamp(float(row[timestamp_column])) 332 else: 333 parse_datetime(row[timestamp_column]) 334 except (ValueError, OSError): 335 raise QueryParametersException( 336 "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)") 337 338 except StopIteration: 339 pass 340 341 # ok, we're done with the file 342 wrapped_file.detach() 343 344 # Whether to strip the HTML tags 345 strip_html = False 346 if query.get("strip_html"): 347 strip_html = True 348 349 # return metadata - the filename is sanitised and serves no purpose at 350 # this point in time, but can be used to uniquely identify a dataset 351 disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]") 352 return { 353 "filename": disallowed_characters.sub("", file.filename), 354 "time": time.time(), 355 "datasource": "upload", 356 "board": query.get("format", "custom").replace("_", "-"), 357 "format": query.get("format"), 358 "strip_html": strip_html, 359 **column_mapping, 360 } 361 362 def after_create(query, dataset, request): 363 """ 364 Hook to execute after the dataset for this source has been created 365 366 In this case, put the file in a temporary location so it can be 367 processed properly by the related Job later. 368 369 :param dict query: Sanitised query parameters 370 :param DataSet dataset: Dataset created for this query 371 :param request: Flask request submitted for its creation 372 """ 373 file = request.files["option-data_upload"] 374 file.seek(0) 375 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 376 while True: 377 chunk = file.read(1024) 378 if len(chunk) == 0: 379 break 380 outfile.write(chunk)
Abstract processor class
A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).
To determine whether a processor can process a given dataset, you can
define a is_compatible_with(FourcatModule module=None, str user=None):) -> bool
class
method which takes a dataset as argument and returns a bool that determines
if this processor is considered compatible with that dataset. For example:
@classmethod def is_compatible_with(cls, module=None, user=None): return module.type == "linguistic-features"
63 def process(self): 64 """ 65 Process uploaded CSV file 66 67 Applies the provided mapping and makes sure the file is in a format 68 4CAT will understand. 69 """ 70 tool_format = import_formats.tools.get(self.parameters.get("format")) 71 temp_file = self.dataset.get_results_path().with_suffix(".importing") 72 with temp_file.open("rb") as infile: 73 # detect encoding - UTF-8 with or without BOM 74 encoding = sniff_encoding(infile) 75 76 # figure out the csv dialect 77 # the sniffer is not perfect and sometimes makes mistakes 78 # for some formats we already know the dialect, so we can override its 79 # guess and set the properties as defined in import_formats.py 80 infile = temp_file.open("r", encoding=encoding) 81 sample = infile.read(1024 * 1024) 82 try: 83 possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))] 84 except csv.Error: 85 possible_dialects = csv.list_dialects() 86 if tool_format.get("csv_dialect", {}): 87 # Known dialects are defined in import_formats.py 88 dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t")) 89 for prop in tool_format.get("csv_dialect", {}): 90 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 91 possible_dialects.append(dialect) 92 93 while possible_dialects: 94 # With validated csvs, save as is but make sure the raw file is sorted 95 infile.seek(0) 96 dialect = possible_dialects.pop() # Use the last dialect first 97 self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) == csv.Dialect else dialect}") 98 reader = csv.DictReader(infile, dialect=dialect) 99 100 if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \ 101 set(tool_format["columns"]) != set(tool_format["columns"]): 102 raise QueryParametersException("Not all columns are present") 103 104 # hasher for pseudonymisation 105 salt = secrets.token_bytes(16) 106 hasher = hashlib.blake2b(digest_size=24, salt=salt) 107 hash_cache = HashCache(hasher) 108 109 # write the resulting dataset 110 writer = None 111 done = 0 112 skipped = 0 113 with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv: 114 # mapper is defined in import_formats 115 try: 116 for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)): 117 if isinstance(item, import_formats.InvalidImportedItem): 118 # if the mapper returns this class, the item is not written 119 skipped += 1 120 if hasattr(item, "reason"): 121 self.dataset.log(f"Skipping item ({item.reason})") 122 continue 123 124 if not writer: 125 writer = csv.DictWriter(output_csv, fieldnames=list(item.keys())) 126 writer.writeheader() 127 128 if self.parameters.get("strip_html") and "body" in item: 129 item["body"] = strip_tags(item["body"]) 130 131 # pseudonymise or anonymise as needed 132 filtering = self.parameters.get("pseudonymise") 133 try: 134 if filtering: 135 for field, value in item.items(): 136 if field is None: 137 # This would normally be caught when writerow is called 138 raise CsvDialectException("Field is None") 139 if field.startswith("author"): 140 if filtering == "anonymise": 141 item[field] = "REDACTED" 142 elif filtering == "pseudonymise": 143 item[field] = hash_cache.update_cache(value) 144 145 writer.writerow(item) 146 except ValueError as e: 147 if not possible_dialects: 148 self.dataset.log(f"Error ({e}) writing item {i}: {item}") 149 return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct " 150 "format or edited the CSV after exporting? Try importing " 151 "as custom format.") 152 else: 153 raise CsvDialectException(f"Error ({e}) writing item {i}: {item}") 154 155 done += 1 156 157 except import_formats.InvalidCustomFormat as e: 158 self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset " 159 "log for details.") 160 infile.close() 161 temp_file.unlink() 162 return self.dataset.finish_with_error(str(e)) 163 164 except UnicodeDecodeError as e: 165 infile.close() 166 temp_file.unlink() 167 return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. " 168 "Make sure the file is encoded properly and try again.") 169 170 except CsvDialectException as e: 171 self.dataset.log(f"Error with CSV dialect: {vars(dialect)}") 172 continue 173 174 # done! 175 infile.close() 176 # We successfully read the CSV, no need to try other dialects 177 break 178 179 if skipped: 180 self.dataset.update_status( 181 f"CSV file imported, but {skipped:,} items were skipped because their date could not be parsed.", 182 is_final=True) 183 184 temp_file.unlink() 185 self.dataset.delete_parameter("filename") 186 if skipped and not done: 187 self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing " 188 "the item's timestamp may be in a format that cannot be parsed properly.") 189 else: 190 self.dataset.finish(done)
Process uploaded CSV file
Applies the provided mapping and makes sure the file is in a format 4CAT will understand.
192 def validate_query(query, request, user): 193 """ 194 Validate custom data input 195 196 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 197 some metadata. 198 199 :param dict query: Query parameters, from client-side. 200 :param request: Flask request 201 :param User user: User object of user who has submitted the query 202 :return dict: Safe query parameters 203 """ 204 # do we have an uploaded file? 205 if "option-data_upload" not in request.files: 206 raise QueryParametersException("No file was offered for upload.") 207 208 file = request.files["option-data_upload"] 209 if not file: 210 raise QueryParametersException("No file was offered for upload.") 211 212 if query.get("format") not in import_formats.tools: 213 raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}") 214 215 # content_length seems unreliable, so figure out the length by reading 216 # the file... 217 upload_size = 0 218 while True: 219 bit = file.read(1024) 220 if len(bit) == 0: 221 break 222 upload_size += len(bit) 223 224 file.seek(0) 225 encoding = sniff_encoding(file) 226 tool_format = import_formats.tools.get(query.get("format")) 227 228 try: 229 # try reading the file as csv here 230 # never read more than 128 kB (to keep it quick) 231 sample_size = min(upload_size, 128 * 1024) # 128 kB is sent from the frontend at most 232 wrapped_file = io.TextIOWrapper(file, encoding=encoding) 233 sample = wrapped_file.read(sample_size) 234 235 if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"): 236 # this may be intended, or the check may be bad, so allow user to continue 237 raise QueryNeedsExplicitConfirmationException( 238 "The uploaded file does not seem to have a header row. Continue anyway?") 239 240 wrapped_file.seek(0) 241 dialect = csv.Sniffer().sniff(sample, delimiters=",;\t") 242 243 # override the guesses for specific formats if defined so in 244 # import_formats.py 245 for prop in tool_format.get("csv_dialect", {}): 246 setattr(dialect, prop, tool_format["csv_dialect"][prop]) 247 248 except UnicodeDecodeError as e: 249 raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. " 250 "Save the file in the proper format and try again.") 251 except csv.Error: 252 raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 253 254 # With validated csvs, save as is but make sure the raw file is sorted 255 reader = csv.DictReader(wrapped_file, dialect=dialect) 256 257 # we know that the CSV file is a CSV file now, next verify whether 258 # we know what each column means 259 try: 260 fields = reader.fieldnames 261 except UnicodeDecodeError: 262 raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.") 263 264 incomplete_mapping = list(tool_format["columns"]) 265 for field in tool_format["columns"]: 266 if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form: 267 incomplete_mapping.remove(field) 268 elif not tool_format.get("allow_user_mapping", False) and field in fields: 269 incomplete_mapping.remove(field) 270 271 # offer the user a number of select boxes where they can indicate the 272 # mapping for each column 273 column_mapping = {} 274 if tool_format.get("allow_user_mapping", False): 275 magic_mappings = { 276 "id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 277 "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"}, 278 "empty": {"__4cat_empty_value": "[empty]"}, 279 "timestamp": {"__4cat_now": "[current date and time]"} 280 } 281 if incomplete_mapping: 282 raise QueryNeedsFurtherInputException({ 283 "mapping-info": { 284 "type": UserInput.OPTION_INFO, 285 "help": "Please confirm which column in the CSV file maps to each required value." 286 }, 287 **{ 288 "mapping-%s" % mappable_column: { 289 "type": UserInput.OPTION_CHOICE, 290 "options": { 291 "": "", 292 **magic_mappings.get(mappable_column, magic_mappings["empty"]), 293 **{column: column for column in fields} 294 }, 295 "default": mappable_column if mappable_column in fields else "", 296 "help": mappable_column, 297 "tooltip": tool_format["columns"][mappable_column] 298 } for mappable_column in incomplete_mapping 299 }}) 300 301 # the mappings do need to point to a column in the csv file 302 missing_mapping = [] 303 for field in tool_format["columns"]: 304 mapping_field = "option-mapping-%s" % field 305 provided_field = request.form.get(mapping_field) 306 if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field: 307 missing_mapping.append(field) 308 else: 309 column_mapping["mapping-" + field] = request.form.get(mapping_field) 310 311 if missing_mapping: 312 raise QueryParametersException( 313 "You need to indicate which column in the CSV file holds the corresponding value for the following " 314 "columns: %s" % ", ".join(missing_mapping)) 315 316 elif incomplete_mapping: 317 raise QueryParametersException("The CSV file does not contain all required columns. The following columns " 318 "are missing: %s" % ", ".join(incomplete_mapping)) 319 320 # the timestamp column needs to be parseable 321 timestamp_column = request.form.get("mapping-timestamp") 322 try: 323 row = reader.__next__() 324 if timestamp_column not in row: 325 # incomplete row because we are analysing a sample 326 # stop parsing because no complete rows will follow 327 raise StopIteration 328 329 try: 330 if row[timestamp_column].isdecimal(): 331 datetime.fromtimestamp(float(row[timestamp_column])) 332 else: 333 parse_datetime(row[timestamp_column]) 334 except (ValueError, OSError): 335 raise QueryParametersException( 336 "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)") 337 338 except StopIteration: 339 pass 340 341 # ok, we're done with the file 342 wrapped_file.detach() 343 344 # Whether to strip the HTML tags 345 strip_html = False 346 if query.get("strip_html"): 347 strip_html = True 348 349 # return metadata - the filename is sanitised and serves no purpose at 350 # this point in time, but can be used to uniquely identify a dataset 351 disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]") 352 return { 353 "filename": disallowed_characters.sub("", file.filename), 354 "time": time.time(), 355 "datasource": "upload", 356 "board": query.get("format", "custom").replace("_", "-"), 357 "format": query.get("format"), 358 "strip_html": strip_html, 359 **column_mapping, 360 }
Validate custom data input
Confirms that the uploaded file is a valid CSV or tab file and, if so, returns some metadata.
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- User user: User object of user who has submitted the query
Returns
Safe query parameters
362 def after_create(query, dataset, request): 363 """ 364 Hook to execute after the dataset for this source has been created 365 366 In this case, put the file in a temporary location so it can be 367 processed properly by the related Job later. 368 369 :param dict query: Sanitised query parameters 370 :param DataSet dataset: Dataset created for this query 371 :param request: Flask request submitted for its creation 372 """ 373 file = request.files["option-data_upload"] 374 file.seek(0) 375 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 376 while True: 377 chunk = file.read(1024) 378 if len(chunk) == 0: 379 break 380 outfile.write(chunk)
Hook to execute after the dataset for this source has been created
In this case, put the file in a temporary location so it can be processed properly by the related Job later.
Parameters
- dict query: Sanitised query parameters
- DataSet dataset: Dataset created for this query
- request: Flask request submitted for its creation
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- config
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- add_field_to_parent
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- map_item_method_available
- get_mapped_item
- is_filter
- get_options
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor