Edit on GitHub

datasources.upload.import_csv

Custom data upload to create bespoke datasets

  1"""
  2Custom data upload to create bespoke datasets
  3"""
  4import secrets
  5import hashlib
  6import time
  7import csv
  8import re
  9import io
 10
 11import datasources.upload.import_formats as import_formats
 12
 13from dateutil.parser import parse as parse_datetime
 14from datetime import datetime
 15
 16from backend.lib.processor import BasicProcessor
 17from common.lib.exceptions import QueryParametersException, QueryNeedsFurtherInputException, \
 18    QueryNeedsExplicitConfirmationException, CsvDialectException
 19from common.lib.helpers import strip_tags, sniff_encoding, UserInput, HashCache
 20
 21
 22class SearchCustom(BasicProcessor):
 23    type = "upload-search"  # job ID
 24    category = "Search"  # category
 25    title = "Custom Dataset Upload"  # title displayed in UI
 26    description = "Upload your own CSV file to be used as a dataset"  # description displayed in UI
 27    extension = "csv"  # extension of result file, used internally and in UI
 28    is_local = False  # Whether this datasource is locally scraped
 29    is_static = False  # Whether this datasource is still updated
 30
 31    max_workers = 1
 32    options = {
 33        "intro": {
 34            "type": UserInput.OPTION_INFO,
 35            "help": "You can upload a CSV or TAB file here that, after upload, will be available for further analysis "
 36                    "and processing. Files need to be [UTF-8](https://en.wikipedia.org/wiki/UTF-8)-encoded and must "
 37                    "contain a header row.\n\n"
 38                    "You can indicate what format the file has or upload one with arbitrary structure. In the latter "
 39                    "case, for each item, columns describing its ID, author, timestamp, and content are expected. You "
 40                    "can select which column holds which value after uploading the file."
 41        },
 42        "data_upload": {
 43            "type": UserInput.OPTION_FILE,
 44            "help": "File"
 45        },
 46        "format": {
 47            "type": UserInput.OPTION_CHOICE,
 48            "help": "CSV format",
 49            "options": {
 50                tool: info["name"] for tool, info in import_formats.tools.items()
 51            },
 52            "default": "custom"
 53        },
 54        "strip_html": {
 55            "type": UserInput.OPTION_TOGGLE,
 56            "help": "Strip HTML?",
 57            "default": False,
 58            "tooltip": "Removes HTML tags from the column identified as containing the item content ('body' by default)"
 59        }
 60    }
 61
 62    def process(self):
 63        """
 64        Process uploaded CSV file
 65
 66        Applies the provided mapping and makes sure the file is in a format
 67        4CAT will understand.
 68        """
 69        tool_format = import_formats.tools.get(self.parameters.get("format"))
 70        temp_file = self.dataset.get_results_path().with_suffix(".importing")
 71        with temp_file.open("rb") as infile:
 72            # detect encoding - UTF-8 with or without BOM
 73            encoding = sniff_encoding(infile)
 74
 75        # figure out the csv dialect
 76        # the sniffer is not perfect and sometimes makes mistakes
 77        # for some formats we already know the dialect, so we can override its
 78        # guess and set the properties as defined in import_formats.py
 79        infile = temp_file.open("r", encoding=encoding)
 80        sample = infile.read(1024 * 1024)
 81        try:
 82            possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))]
 83        except csv.Error:
 84            possible_dialects = csv.list_dialects()
 85        if tool_format.get("csv_dialect", {}):
 86            # Known dialects are defined in import_formats.py
 87            dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))
 88            for prop in tool_format.get("csv_dialect", {}):
 89                setattr(dialect, prop, tool_format["csv_dialect"][prop])
 90            possible_dialects.append(dialect)
 91
 92        while possible_dialects:
 93            # With validated csvs, save as is but make sure the raw file is sorted
 94            infile.seek(0)
 95            dialect = possible_dialects.pop() # Use the last dialect first
 96            self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) is csv.Dialect else dialect}")
 97            reader = csv.DictReader(infile, dialect=dialect)
 98
 99            if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \
100                    set(tool_format["columns"]) != set(tool_format["columns"]):
101                raise QueryParametersException("Not all columns are present")
102
103            # hasher for pseudonymisation
104            salt = secrets.token_bytes(16)
105            hasher = hashlib.blake2b(digest_size=24, salt=salt)
106            hash_cache = HashCache(hasher)
107
108            # write the resulting dataset
109            writer = None
110            done = 0
111            skipped = 0
112            timestamp_missing = 0
113            with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv:
114                # mapper is defined in import_formats
115                try:
116                    for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)):
117                        if isinstance(item, import_formats.InvalidImportedItem):
118                            # if the mapper returns this class, the item is not written
119                            skipped += 1
120                            if hasattr(item, "reason"):
121                                self.dataset.log(f"Skipping item ({item.reason})")
122                            continue
123
124                        if not writer:
125                            writer = csv.DictWriter(output_csv, fieldnames=list(item.keys()))
126                            writer.writeheader()
127
128                        if self.parameters.get("strip_html") and "body" in item:
129                            item["body"] = strip_tags(item["body"])
130
131                        # check for None/empty timestamp
132                        if not item.get("timestamp"):
133                            # Notify the user that items are missing a timestamp
134                            timestamp_missing += 1
135                            self.dataset.log(f"Item {i} ({item.get('id')}) has no timestamp.")
136
137                        # pseudonymise or anonymise as needed
138                        filtering = self.parameters.get("pseudonymise")
139                        try:
140                            if filtering:
141                                for field, value in item.items():
142                                    if field is None:
143                                        # This would normally be caught when writerow is called
144                                        raise CsvDialectException("Field is None")
145                                    if field.startswith("author"):
146                                        if filtering == "anonymise":
147                                            item[field] = "REDACTED"
148                                        elif filtering == "pseudonymise":
149                                            item[field] = hash_cache.update_cache(value)
150
151                            writer.writerow(item)
152                        except ValueError as e:
153                            if not possible_dialects:
154                                self.dataset.log(f"Error ({e}) writing item {i}: {item}")
155                                return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct "
156                                                                      "format or edited the CSV after exporting? Try importing "
157                                                                      "as custom format.")
158                            else:
159                                raise CsvDialectException(f"Error ({e}) writing item {i}: {item}")
160
161                        done += 1
162
163                except import_formats.InvalidCustomFormat as e:
164                    self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset "
165                                     "log for details.")
166                    infile.close()
167                    temp_file.unlink()
168                    return self.dataset.finish_with_error(str(e))
169
170                except UnicodeDecodeError:
171                    infile.close()
172                    temp_file.unlink()
173                    return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. "
174                                                          "Make sure the file is encoded properly and try again.")
175
176                except CsvDialectException:
177                    self.dataset.log(f"Error with CSV dialect: {vars(dialect)}")
178                    continue
179
180            # done!
181            infile.close()
182            # We successfully read the CSV, no need to try other dialects
183            break
184
185        if skipped or timestamp_missing:
186            error_message = ""
187            if timestamp_missing:
188                error_message += f"{timestamp_missing:,} items had no timestamp"
189            if skipped:
190                error_message += f"{' and ' if timestamp_missing else ''}{skipped:,} items were skipped because they could not be parsed or did not match the expected format"
191            
192            self.dataset.update_status(
193                f"CSV file imported, but {error_message}. See dataset log for details.",
194                is_final=True)
195
196        temp_file.unlink()
197        self.dataset.delete_parameter("filename")
198        if skipped and not done:
199            self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing "
200                                           "the item's timestamp may be in a format that cannot be parsed properly.")
201        else:
202            self.dataset.finish(done)
203
204    def validate_query(query, request, config):
205        """
206        Validate custom data input
207
208        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
209        some metadata.
210
211        :param dict query:  Query parameters, from client-side.
212        :param request:  Flask request
213        :param ConfigManager|None config:  Configuration reader (context-aware)
214        :return dict:  Safe query parameters
215        """
216        # do we have an uploaded file?
217        if "option-data_upload" not in request.files:
218            raise QueryParametersException("No file was offered for upload.")
219
220        file = request.files["option-data_upload"]
221        if not file:
222            raise QueryParametersException("No file was offered for upload.")
223
224        if query.get("format") not in import_formats.tools:
225            raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}")
226
227        # content_length seems unreliable, so figure out the length by reading
228        # the file...
229        upload_size = 0
230        while True:
231            bit = file.read(1024)
232            if len(bit) == 0:
233                break
234            upload_size += len(bit)
235
236        file.seek(0)
237        encoding = sniff_encoding(file)
238        tool_format = import_formats.tools.get(query.get("format"))
239
240        try:
241            # try reading the file as csv here
242            # never read more than 128 kB (to keep it quick)
243            sample_size = min(upload_size, 128 * 1024)  # 128 kB is sent from the frontend at most
244            wrapped_file = io.TextIOWrapper(file, encoding=encoding)
245            sample = wrapped_file.read(sample_size)
246
247            if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"):
248                # this may be intended, or the check may be bad, so allow user to continue
249                raise QueryNeedsExplicitConfirmationException(
250                    "The uploaded file does not seem to have a header row. Continue anyway?")
251
252            wrapped_file.seek(0)
253            dialect = csv.Sniffer().sniff(sample, delimiters=",;\t")
254
255            # override the guesses for specific formats if defined so in
256            # import_formats.py
257            for prop in tool_format.get("csv_dialect", {}):
258                setattr(dialect, prop, tool_format["csv_dialect"][prop])
259
260        except UnicodeDecodeError:
261            raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. "
262                                           "Save the file in the proper format and try again.")
263        except csv.Error:
264            raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.")
265
266        # With validated csvs, save as is but make sure the raw file is sorted
267        reader = csv.DictReader(wrapped_file, dialect=dialect)
268
269        # we know that the CSV file is a CSV file now, next verify whether
270        # we know what each column means
271        try:
272            fields = reader.fieldnames
273        except UnicodeDecodeError:
274            raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.")
275
276        incomplete_mapping = list(tool_format["columns"])
277        for field in tool_format["columns"]:
278            if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form:
279                incomplete_mapping.remove(field)
280            elif not tool_format.get("allow_user_mapping", False) and field in fields:
281                incomplete_mapping.remove(field)
282
283        # offer the user a number of select boxes where they can indicate the
284        # mapping for each column
285        column_mapping = {}
286        if tool_format.get("allow_user_mapping", False):
287            magic_mappings = {
288                "id": {"__4cat_auto_sequence": "[generate sequential IDs]"},
289                "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"},
290                "empty": {"__4cat_empty_value": "[empty]"},
291                "timestamp": {"__4cat_now": "[current date and time]"}
292            }
293            if incomplete_mapping:
294                raise QueryNeedsFurtherInputException({
295                    "mapping-info": {
296                        "type": UserInput.OPTION_INFO,
297                        "help": "Please confirm which column in the CSV file maps to each required value."
298                    },
299                    **{
300                        "mapping-%s" % mappable_column: {
301                            "type": UserInput.OPTION_CHOICE,
302                            "options": {
303                                "": "",
304                                **magic_mappings.get(mappable_column, magic_mappings["empty"]),
305                                **{column: column for column in fields}
306                            },
307                            "default": mappable_column if mappable_column in fields else "",
308                            "help": mappable_column,
309                            "tooltip": tool_format["columns"][mappable_column]
310                        } for mappable_column in incomplete_mapping
311                    }})
312
313            # the mappings do need to point to a column in the csv file
314            missing_mapping = []
315            for field in tool_format["columns"]:
316                mapping_field = "option-mapping-%s" % field
317                provided_field = request.form.get(mapping_field)
318                if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field:
319                    missing_mapping.append(field)
320                else:
321                    column_mapping["mapping-" + field] = request.form.get(mapping_field)
322
323            if missing_mapping:
324                raise QueryParametersException(
325                    "You need to indicate which column in the CSV file holds the corresponding value for the following "
326                    "columns: %s" % ", ".join(missing_mapping))
327
328        elif incomplete_mapping:
329            raise QueryParametersException("The CSV file does not contain all required columns. The following columns "
330                                           "are missing: %s" % ", ".join(incomplete_mapping))
331
332        # the timestamp column needs to be parseable
333        timestamp_column = request.form.get("mapping-timestamp")
334        try:
335            row = reader.__next__()
336            if timestamp_column not in row:
337                # incomplete row because we are analysing a sample
338                # stop parsing because no complete rows will follow
339                raise StopIteration
340
341            if row[timestamp_column]:
342                try:
343                    if row[timestamp_column].isdecimal():
344                        datetime.fromtimestamp(float(row[timestamp_column]))
345                    else:
346                        parse_datetime(row[timestamp_column])
347                except (ValueError, OSError):
348                    raise QueryParametersException(
349                        "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)")
350            else:
351                # the timestamp column is empty or contains empty values
352                if not query.get("frontend-confirm"):
353                    # TODO: THIS never triggers! frontend-confirm is already set when columns are mapped
354                    # TODO: frontend-confirm exceptions need to be made unique
355                    raise QueryNeedsExplicitConfirmationException(
356                        "Your 'timestamp' column contains empty values. Continue anyway?")
357                else:
358                    # `None` value will be used
359                    pass
360
361        except StopIteration:
362            pass
363
364        # ok, we're done with the file
365        wrapped_file.detach()
366
367        # Whether to strip the HTML tags
368        strip_html = False
369        if query.get("strip_html"):
370            strip_html = True
371
372        # return metadata - the filename is sanitised and serves no purpose at
373        # this point in time, but can be used to uniquely identify a dataset
374        disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]")
375        return {
376            "filename": disallowed_characters.sub("", file.filename),
377            "time": time.time(),
378            "datasource": "upload",
379            "board": query.get("format", "custom").replace("_", "-"),
380            "format": query.get("format"),
381            "strip_html": strip_html,
382            **column_mapping,
383        }
384
385    def after_create(query, dataset, request):
386        """
387        Hook to execute after the dataset for this source has been created
388
389        In this case, put the file in a temporary location so it can be
390        processed properly by the related Job later.
391
392        :param dict query:  Sanitised query parameters
393        :param DataSet dataset:  Dataset created for this query
394        :param request:  Flask request submitted for its creation
395        """
396        file = request.files["option-data_upload"]
397        file.seek(0)
398        with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
399            while True:
400                chunk = file.read(1024)
401                if len(chunk) == 0:
402                    break
403                outfile.write(chunk)
class SearchCustom(backend.lib.processor.BasicProcessor):
 23class SearchCustom(BasicProcessor):
 24    type = "upload-search"  # job ID
 25    category = "Search"  # category
 26    title = "Custom Dataset Upload"  # title displayed in UI
 27    description = "Upload your own CSV file to be used as a dataset"  # description displayed in UI
 28    extension = "csv"  # extension of result file, used internally and in UI
 29    is_local = False  # Whether this datasource is locally scraped
 30    is_static = False  # Whether this datasource is still updated
 31
 32    max_workers = 1
 33    options = {
 34        "intro": {
 35            "type": UserInput.OPTION_INFO,
 36            "help": "You can upload a CSV or TAB file here that, after upload, will be available for further analysis "
 37                    "and processing. Files need to be [UTF-8](https://en.wikipedia.org/wiki/UTF-8)-encoded and must "
 38                    "contain a header row.\n\n"
 39                    "You can indicate what format the file has or upload one with arbitrary structure. In the latter "
 40                    "case, for each item, columns describing its ID, author, timestamp, and content are expected. You "
 41                    "can select which column holds which value after uploading the file."
 42        },
 43        "data_upload": {
 44            "type": UserInput.OPTION_FILE,
 45            "help": "File"
 46        },
 47        "format": {
 48            "type": UserInput.OPTION_CHOICE,
 49            "help": "CSV format",
 50            "options": {
 51                tool: info["name"] for tool, info in import_formats.tools.items()
 52            },
 53            "default": "custom"
 54        },
 55        "strip_html": {
 56            "type": UserInput.OPTION_TOGGLE,
 57            "help": "Strip HTML?",
 58            "default": False,
 59            "tooltip": "Removes HTML tags from the column identified as containing the item content ('body' by default)"
 60        }
 61    }
 62
 63    def process(self):
 64        """
 65        Process uploaded CSV file
 66
 67        Applies the provided mapping and makes sure the file is in a format
 68        4CAT will understand.
 69        """
 70        tool_format = import_formats.tools.get(self.parameters.get("format"))
 71        temp_file = self.dataset.get_results_path().with_suffix(".importing")
 72        with temp_file.open("rb") as infile:
 73            # detect encoding - UTF-8 with or without BOM
 74            encoding = sniff_encoding(infile)
 75
 76        # figure out the csv dialect
 77        # the sniffer is not perfect and sometimes makes mistakes
 78        # for some formats we already know the dialect, so we can override its
 79        # guess and set the properties as defined in import_formats.py
 80        infile = temp_file.open("r", encoding=encoding)
 81        sample = infile.read(1024 * 1024)
 82        try:
 83            possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))]
 84        except csv.Error:
 85            possible_dialects = csv.list_dialects()
 86        if tool_format.get("csv_dialect", {}):
 87            # Known dialects are defined in import_formats.py
 88            dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))
 89            for prop in tool_format.get("csv_dialect", {}):
 90                setattr(dialect, prop, tool_format["csv_dialect"][prop])
 91            possible_dialects.append(dialect)
 92
 93        while possible_dialects:
 94            # With validated csvs, save as is but make sure the raw file is sorted
 95            infile.seek(0)
 96            dialect = possible_dialects.pop() # Use the last dialect first
 97            self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) is csv.Dialect else dialect}")
 98            reader = csv.DictReader(infile, dialect=dialect)
 99
100            if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \
101                    set(tool_format["columns"]) != set(tool_format["columns"]):
102                raise QueryParametersException("Not all columns are present")
103
104            # hasher for pseudonymisation
105            salt = secrets.token_bytes(16)
106            hasher = hashlib.blake2b(digest_size=24, salt=salt)
107            hash_cache = HashCache(hasher)
108
109            # write the resulting dataset
110            writer = None
111            done = 0
112            skipped = 0
113            timestamp_missing = 0
114            with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv:
115                # mapper is defined in import_formats
116                try:
117                    for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)):
118                        if isinstance(item, import_formats.InvalidImportedItem):
119                            # if the mapper returns this class, the item is not written
120                            skipped += 1
121                            if hasattr(item, "reason"):
122                                self.dataset.log(f"Skipping item ({item.reason})")
123                            continue
124
125                        if not writer:
126                            writer = csv.DictWriter(output_csv, fieldnames=list(item.keys()))
127                            writer.writeheader()
128
129                        if self.parameters.get("strip_html") and "body" in item:
130                            item["body"] = strip_tags(item["body"])
131
132                        # check for None/empty timestamp
133                        if not item.get("timestamp"):
134                            # Notify the user that items are missing a timestamp
135                            timestamp_missing += 1
136                            self.dataset.log(f"Item {i} ({item.get('id')}) has no timestamp.")
137
138                        # pseudonymise or anonymise as needed
139                        filtering = self.parameters.get("pseudonymise")
140                        try:
141                            if filtering:
142                                for field, value in item.items():
143                                    if field is None:
144                                        # This would normally be caught when writerow is called
145                                        raise CsvDialectException("Field is None")
146                                    if field.startswith("author"):
147                                        if filtering == "anonymise":
148                                            item[field] = "REDACTED"
149                                        elif filtering == "pseudonymise":
150                                            item[field] = hash_cache.update_cache(value)
151
152                            writer.writerow(item)
153                        except ValueError as e:
154                            if not possible_dialects:
155                                self.dataset.log(f"Error ({e}) writing item {i}: {item}")
156                                return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct "
157                                                                      "format or edited the CSV after exporting? Try importing "
158                                                                      "as custom format.")
159                            else:
160                                raise CsvDialectException(f"Error ({e}) writing item {i}: {item}")
161
162                        done += 1
163
164                except import_formats.InvalidCustomFormat as e:
165                    self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset "
166                                     "log for details.")
167                    infile.close()
168                    temp_file.unlink()
169                    return self.dataset.finish_with_error(str(e))
170
171                except UnicodeDecodeError:
172                    infile.close()
173                    temp_file.unlink()
174                    return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. "
175                                                          "Make sure the file is encoded properly and try again.")
176
177                except CsvDialectException:
178                    self.dataset.log(f"Error with CSV dialect: {vars(dialect)}")
179                    continue
180
181            # done!
182            infile.close()
183            # We successfully read the CSV, no need to try other dialects
184            break
185
186        if skipped or timestamp_missing:
187            error_message = ""
188            if timestamp_missing:
189                error_message += f"{timestamp_missing:,} items had no timestamp"
190            if skipped:
191                error_message += f"{' and ' if timestamp_missing else ''}{skipped:,} items were skipped because they could not be parsed or did not match the expected format"
192            
193            self.dataset.update_status(
194                f"CSV file imported, but {error_message}. See dataset log for details.",
195                is_final=True)
196
197        temp_file.unlink()
198        self.dataset.delete_parameter("filename")
199        if skipped and not done:
200            self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing "
201                                           "the item's timestamp may be in a format that cannot be parsed properly.")
202        else:
203            self.dataset.finish(done)
204
205    def validate_query(query, request, config):
206        """
207        Validate custom data input
208
209        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
210        some metadata.
211
212        :param dict query:  Query parameters, from client-side.
213        :param request:  Flask request
214        :param ConfigManager|None config:  Configuration reader (context-aware)
215        :return dict:  Safe query parameters
216        """
217        # do we have an uploaded file?
218        if "option-data_upload" not in request.files:
219            raise QueryParametersException("No file was offered for upload.")
220
221        file = request.files["option-data_upload"]
222        if not file:
223            raise QueryParametersException("No file was offered for upload.")
224
225        if query.get("format") not in import_formats.tools:
226            raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}")
227
228        # content_length seems unreliable, so figure out the length by reading
229        # the file...
230        upload_size = 0
231        while True:
232            bit = file.read(1024)
233            if len(bit) == 0:
234                break
235            upload_size += len(bit)
236
237        file.seek(0)
238        encoding = sniff_encoding(file)
239        tool_format = import_formats.tools.get(query.get("format"))
240
241        try:
242            # try reading the file as csv here
243            # never read more than 128 kB (to keep it quick)
244            sample_size = min(upload_size, 128 * 1024)  # 128 kB is sent from the frontend at most
245            wrapped_file = io.TextIOWrapper(file, encoding=encoding)
246            sample = wrapped_file.read(sample_size)
247
248            if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"):
249                # this may be intended, or the check may be bad, so allow user to continue
250                raise QueryNeedsExplicitConfirmationException(
251                    "The uploaded file does not seem to have a header row. Continue anyway?")
252
253            wrapped_file.seek(0)
254            dialect = csv.Sniffer().sniff(sample, delimiters=",;\t")
255
256            # override the guesses for specific formats if defined so in
257            # import_formats.py
258            for prop in tool_format.get("csv_dialect", {}):
259                setattr(dialect, prop, tool_format["csv_dialect"][prop])
260
261        except UnicodeDecodeError:
262            raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. "
263                                           "Save the file in the proper format and try again.")
264        except csv.Error:
265            raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.")
266
267        # With validated csvs, save as is but make sure the raw file is sorted
268        reader = csv.DictReader(wrapped_file, dialect=dialect)
269
270        # we know that the CSV file is a CSV file now, next verify whether
271        # we know what each column means
272        try:
273            fields = reader.fieldnames
274        except UnicodeDecodeError:
275            raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.")
276
277        incomplete_mapping = list(tool_format["columns"])
278        for field in tool_format["columns"]:
279            if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form:
280                incomplete_mapping.remove(field)
281            elif not tool_format.get("allow_user_mapping", False) and field in fields:
282                incomplete_mapping.remove(field)
283
284        # offer the user a number of select boxes where they can indicate the
285        # mapping for each column
286        column_mapping = {}
287        if tool_format.get("allow_user_mapping", False):
288            magic_mappings = {
289                "id": {"__4cat_auto_sequence": "[generate sequential IDs]"},
290                "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"},
291                "empty": {"__4cat_empty_value": "[empty]"},
292                "timestamp": {"__4cat_now": "[current date and time]"}
293            }
294            if incomplete_mapping:
295                raise QueryNeedsFurtherInputException({
296                    "mapping-info": {
297                        "type": UserInput.OPTION_INFO,
298                        "help": "Please confirm which column in the CSV file maps to each required value."
299                    },
300                    **{
301                        "mapping-%s" % mappable_column: {
302                            "type": UserInput.OPTION_CHOICE,
303                            "options": {
304                                "": "",
305                                **magic_mappings.get(mappable_column, magic_mappings["empty"]),
306                                **{column: column for column in fields}
307                            },
308                            "default": mappable_column if mappable_column in fields else "",
309                            "help": mappable_column,
310                            "tooltip": tool_format["columns"][mappable_column]
311                        } for mappable_column in incomplete_mapping
312                    }})
313
314            # the mappings do need to point to a column in the csv file
315            missing_mapping = []
316            for field in tool_format["columns"]:
317                mapping_field = "option-mapping-%s" % field
318                provided_field = request.form.get(mapping_field)
319                if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field:
320                    missing_mapping.append(field)
321                else:
322                    column_mapping["mapping-" + field] = request.form.get(mapping_field)
323
324            if missing_mapping:
325                raise QueryParametersException(
326                    "You need to indicate which column in the CSV file holds the corresponding value for the following "
327                    "columns: %s" % ", ".join(missing_mapping))
328
329        elif incomplete_mapping:
330            raise QueryParametersException("The CSV file does not contain all required columns. The following columns "
331                                           "are missing: %s" % ", ".join(incomplete_mapping))
332
333        # the timestamp column needs to be parseable
334        timestamp_column = request.form.get("mapping-timestamp")
335        try:
336            row = reader.__next__()
337            if timestamp_column not in row:
338                # incomplete row because we are analysing a sample
339                # stop parsing because no complete rows will follow
340                raise StopIteration
341
342            if row[timestamp_column]:
343                try:
344                    if row[timestamp_column].isdecimal():
345                        datetime.fromtimestamp(float(row[timestamp_column]))
346                    else:
347                        parse_datetime(row[timestamp_column])
348                except (ValueError, OSError):
349                    raise QueryParametersException(
350                        "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)")
351            else:
352                # the timestamp column is empty or contains empty values
353                if not query.get("frontend-confirm"):
354                    # TODO: THIS never triggers! frontend-confirm is already set when columns are mapped
355                    # TODO: frontend-confirm exceptions need to be made unique
356                    raise QueryNeedsExplicitConfirmationException(
357                        "Your 'timestamp' column contains empty values. Continue anyway?")
358                else:
359                    # `None` value will be used
360                    pass
361
362        except StopIteration:
363            pass
364
365        # ok, we're done with the file
366        wrapped_file.detach()
367
368        # Whether to strip the HTML tags
369        strip_html = False
370        if query.get("strip_html"):
371            strip_html = True
372
373        # return metadata - the filename is sanitised and serves no purpose at
374        # this point in time, but can be used to uniquely identify a dataset
375        disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]")
376        return {
377            "filename": disallowed_characters.sub("", file.filename),
378            "time": time.time(),
379            "datasource": "upload",
380            "board": query.get("format", "custom").replace("_", "-"),
381            "format": query.get("format"),
382            "strip_html": strip_html,
383            **column_mapping,
384        }
385
386    def after_create(query, dataset, request):
387        """
388        Hook to execute after the dataset for this source has been created
389
390        In this case, put the file in a temporary location so it can be
391        processed properly by the related Job later.
392
393        :param dict query:  Sanitised query parameters
394        :param DataSet dataset:  Dataset created for this query
395        :param request:  Flask request submitted for its creation
396        """
397        file = request.files["option-data_upload"]
398        file.seek(0)
399        with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
400            while True:
401                chunk = file.read(1024)
402                if len(chunk) == 0:
403                    break
404                outfile.write(chunk)

Abstract processor class

A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).

To determine whether a processor can process a given dataset, you can define a is_compatible_with(FourcatModule module=None, config=None):) -> bool class method which takes a dataset as argument and returns a bool that determines if this processor is considered compatible with that dataset. For example:

@classmethod
def is_compatible_with(cls, module=None, config=None):
    return module.type == "linguistic-features"
type = 'upload-search'
category = 'Search'
title = 'Custom Dataset Upload'
description = 'Upload your own CSV file to be used as a dataset'
extension = 'csv'
is_local = False
is_static = False
max_workers = 1
options = {'intro': {'type': 'info', 'help': 'You can upload a CSV or TAB file here that, after upload, will be available for further analysis and processing. Files need to be [UTF-8](https://en.wikipedia.org/wiki/UTF-8)-encoded and must contain a header row.\n\nYou can indicate what format the file has or upload one with arbitrary structure. In the latter case, for each item, columns describing its ID, author, timestamp, and content are expected. You can select which column holds which value after uploading the file.'}, 'data_upload': {'type': 'file', 'help': 'File'}, 'format': {'type': 'choice', 'help': 'CSV format', 'options': {'instagram-crowdtangle': 'Instagram (via CrowdTangle export)', 'facebook-crowdtangle': 'Facebook (via CrowdTangle export)', 'facepager': 'Facebook (via Facepager export)', 'youtube_video_list': "YouTube videos (via YouTube Data Tools' Video List module)", 'youtube_comment_list': "YouTube comments (via YouTube Data Tools' Video Info module)", 'bazhuayu_weibo': 'Sina Weibo (via Bazhuayu)', 'custom': 'Custom/other'}, 'default': 'custom'}, 'strip_html': {'type': 'toggle', 'help': 'Strip HTML?', 'default': False, 'tooltip': "Removes HTML tags from the column identified as containing the item content ('body' by default)"}}
def process(self):
 63    def process(self):
 64        """
 65        Process uploaded CSV file
 66
 67        Applies the provided mapping and makes sure the file is in a format
 68        4CAT will understand.
 69        """
 70        tool_format = import_formats.tools.get(self.parameters.get("format"))
 71        temp_file = self.dataset.get_results_path().with_suffix(".importing")
 72        with temp_file.open("rb") as infile:
 73            # detect encoding - UTF-8 with or without BOM
 74            encoding = sniff_encoding(infile)
 75
 76        # figure out the csv dialect
 77        # the sniffer is not perfect and sometimes makes mistakes
 78        # for some formats we already know the dialect, so we can override its
 79        # guess and set the properties as defined in import_formats.py
 80        infile = temp_file.open("r", encoding=encoding)
 81        sample = infile.read(1024 * 1024)
 82        try:
 83            possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))]
 84        except csv.Error:
 85            possible_dialects = csv.list_dialects()
 86        if tool_format.get("csv_dialect", {}):
 87            # Known dialects are defined in import_formats.py
 88            dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))
 89            for prop in tool_format.get("csv_dialect", {}):
 90                setattr(dialect, prop, tool_format["csv_dialect"][prop])
 91            possible_dialects.append(dialect)
 92
 93        while possible_dialects:
 94            # With validated csvs, save as is but make sure the raw file is sorted
 95            infile.seek(0)
 96            dialect = possible_dialects.pop() # Use the last dialect first
 97            self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) is csv.Dialect else dialect}")
 98            reader = csv.DictReader(infile, dialect=dialect)
 99
100            if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \
101                    set(tool_format["columns"]) != set(tool_format["columns"]):
102                raise QueryParametersException("Not all columns are present")
103
104            # hasher for pseudonymisation
105            salt = secrets.token_bytes(16)
106            hasher = hashlib.blake2b(digest_size=24, salt=salt)
107            hash_cache = HashCache(hasher)
108
109            # write the resulting dataset
110            writer = None
111            done = 0
112            skipped = 0
113            timestamp_missing = 0
114            with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv:
115                # mapper is defined in import_formats
116                try:
117                    for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)):
118                        if isinstance(item, import_formats.InvalidImportedItem):
119                            # if the mapper returns this class, the item is not written
120                            skipped += 1
121                            if hasattr(item, "reason"):
122                                self.dataset.log(f"Skipping item ({item.reason})")
123                            continue
124
125                        if not writer:
126                            writer = csv.DictWriter(output_csv, fieldnames=list(item.keys()))
127                            writer.writeheader()
128
129                        if self.parameters.get("strip_html") and "body" in item:
130                            item["body"] = strip_tags(item["body"])
131
132                        # check for None/empty timestamp
133                        if not item.get("timestamp"):
134                            # Notify the user that items are missing a timestamp
135                            timestamp_missing += 1
136                            self.dataset.log(f"Item {i} ({item.get('id')}) has no timestamp.")
137
138                        # pseudonymise or anonymise as needed
139                        filtering = self.parameters.get("pseudonymise")
140                        try:
141                            if filtering:
142                                for field, value in item.items():
143                                    if field is None:
144                                        # This would normally be caught when writerow is called
145                                        raise CsvDialectException("Field is None")
146                                    if field.startswith("author"):
147                                        if filtering == "anonymise":
148                                            item[field] = "REDACTED"
149                                        elif filtering == "pseudonymise":
150                                            item[field] = hash_cache.update_cache(value)
151
152                            writer.writerow(item)
153                        except ValueError as e:
154                            if not possible_dialects:
155                                self.dataset.log(f"Error ({e}) writing item {i}: {item}")
156                                return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct "
157                                                                      "format or edited the CSV after exporting? Try importing "
158                                                                      "as custom format.")
159                            else:
160                                raise CsvDialectException(f"Error ({e}) writing item {i}: {item}")
161
162                        done += 1
163
164                except import_formats.InvalidCustomFormat as e:
165                    self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset "
166                                     "log for details.")
167                    infile.close()
168                    temp_file.unlink()
169                    return self.dataset.finish_with_error(str(e))
170
171                except UnicodeDecodeError:
172                    infile.close()
173                    temp_file.unlink()
174                    return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. "
175                                                          "Make sure the file is encoded properly and try again.")
176
177                except CsvDialectException:
178                    self.dataset.log(f"Error with CSV dialect: {vars(dialect)}")
179                    continue
180
181            # done!
182            infile.close()
183            # We successfully read the CSV, no need to try other dialects
184            break
185
186        if skipped or timestamp_missing:
187            error_message = ""
188            if timestamp_missing:
189                error_message += f"{timestamp_missing:,} items had no timestamp"
190            if skipped:
191                error_message += f"{' and ' if timestamp_missing else ''}{skipped:,} items were skipped because they could not be parsed or did not match the expected format"
192            
193            self.dataset.update_status(
194                f"CSV file imported, but {error_message}. See dataset log for details.",
195                is_final=True)
196
197        temp_file.unlink()
198        self.dataset.delete_parameter("filename")
199        if skipped and not done:
200            self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing "
201                                           "the item's timestamp may be in a format that cannot be parsed properly.")
202        else:
203            self.dataset.finish(done)

Process uploaded CSV file

Applies the provided mapping and makes sure the file is in a format 4CAT will understand.

def validate_query(query, request, config):
205    def validate_query(query, request, config):
206        """
207        Validate custom data input
208
209        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
210        some metadata.
211
212        :param dict query:  Query parameters, from client-side.
213        :param request:  Flask request
214        :param ConfigManager|None config:  Configuration reader (context-aware)
215        :return dict:  Safe query parameters
216        """
217        # do we have an uploaded file?
218        if "option-data_upload" not in request.files:
219            raise QueryParametersException("No file was offered for upload.")
220
221        file = request.files["option-data_upload"]
222        if not file:
223            raise QueryParametersException("No file was offered for upload.")
224
225        if query.get("format") not in import_formats.tools:
226            raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}")
227
228        # content_length seems unreliable, so figure out the length by reading
229        # the file...
230        upload_size = 0
231        while True:
232            bit = file.read(1024)
233            if len(bit) == 0:
234                break
235            upload_size += len(bit)
236
237        file.seek(0)
238        encoding = sniff_encoding(file)
239        tool_format = import_formats.tools.get(query.get("format"))
240
241        try:
242            # try reading the file as csv here
243            # never read more than 128 kB (to keep it quick)
244            sample_size = min(upload_size, 128 * 1024)  # 128 kB is sent from the frontend at most
245            wrapped_file = io.TextIOWrapper(file, encoding=encoding)
246            sample = wrapped_file.read(sample_size)
247
248            if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"):
249                # this may be intended, or the check may be bad, so allow user to continue
250                raise QueryNeedsExplicitConfirmationException(
251                    "The uploaded file does not seem to have a header row. Continue anyway?")
252
253            wrapped_file.seek(0)
254            dialect = csv.Sniffer().sniff(sample, delimiters=",;\t")
255
256            # override the guesses for specific formats if defined so in
257            # import_formats.py
258            for prop in tool_format.get("csv_dialect", {}):
259                setattr(dialect, prop, tool_format["csv_dialect"][prop])
260
261        except UnicodeDecodeError:
262            raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. "
263                                           "Save the file in the proper format and try again.")
264        except csv.Error:
265            raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.")
266
267        # With validated csvs, save as is but make sure the raw file is sorted
268        reader = csv.DictReader(wrapped_file, dialect=dialect)
269
270        # we know that the CSV file is a CSV file now, next verify whether
271        # we know what each column means
272        try:
273            fields = reader.fieldnames
274        except UnicodeDecodeError:
275            raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.")
276
277        incomplete_mapping = list(tool_format["columns"])
278        for field in tool_format["columns"]:
279            if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form:
280                incomplete_mapping.remove(field)
281            elif not tool_format.get("allow_user_mapping", False) and field in fields:
282                incomplete_mapping.remove(field)
283
284        # offer the user a number of select boxes where they can indicate the
285        # mapping for each column
286        column_mapping = {}
287        if tool_format.get("allow_user_mapping", False):
288            magic_mappings = {
289                "id": {"__4cat_auto_sequence": "[generate sequential IDs]"},
290                "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"},
291                "empty": {"__4cat_empty_value": "[empty]"},
292                "timestamp": {"__4cat_now": "[current date and time]"}
293            }
294            if incomplete_mapping:
295                raise QueryNeedsFurtherInputException({
296                    "mapping-info": {
297                        "type": UserInput.OPTION_INFO,
298                        "help": "Please confirm which column in the CSV file maps to each required value."
299                    },
300                    **{
301                        "mapping-%s" % mappable_column: {
302                            "type": UserInput.OPTION_CHOICE,
303                            "options": {
304                                "": "",
305                                **magic_mappings.get(mappable_column, magic_mappings["empty"]),
306                                **{column: column for column in fields}
307                            },
308                            "default": mappable_column if mappable_column in fields else "",
309                            "help": mappable_column,
310                            "tooltip": tool_format["columns"][mappable_column]
311                        } for mappable_column in incomplete_mapping
312                    }})
313
314            # the mappings do need to point to a column in the csv file
315            missing_mapping = []
316            for field in tool_format["columns"]:
317                mapping_field = "option-mapping-%s" % field
318                provided_field = request.form.get(mapping_field)
319                if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field:
320                    missing_mapping.append(field)
321                else:
322                    column_mapping["mapping-" + field] = request.form.get(mapping_field)
323
324            if missing_mapping:
325                raise QueryParametersException(
326                    "You need to indicate which column in the CSV file holds the corresponding value for the following "
327                    "columns: %s" % ", ".join(missing_mapping))
328
329        elif incomplete_mapping:
330            raise QueryParametersException("The CSV file does not contain all required columns. The following columns "
331                                           "are missing: %s" % ", ".join(incomplete_mapping))
332
333        # the timestamp column needs to be parseable
334        timestamp_column = request.form.get("mapping-timestamp")
335        try:
336            row = reader.__next__()
337            if timestamp_column not in row:
338                # incomplete row because we are analysing a sample
339                # stop parsing because no complete rows will follow
340                raise StopIteration
341
342            if row[timestamp_column]:
343                try:
344                    if row[timestamp_column].isdecimal():
345                        datetime.fromtimestamp(float(row[timestamp_column]))
346                    else:
347                        parse_datetime(row[timestamp_column])
348                except (ValueError, OSError):
349                    raise QueryParametersException(
350                        "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)")
351            else:
352                # the timestamp column is empty or contains empty values
353                if not query.get("frontend-confirm"):
354                    # TODO: THIS never triggers! frontend-confirm is already set when columns are mapped
355                    # TODO: frontend-confirm exceptions need to be made unique
356                    raise QueryNeedsExplicitConfirmationException(
357                        "Your 'timestamp' column contains empty values. Continue anyway?")
358                else:
359                    # `None` value will be used
360                    pass
361
362        except StopIteration:
363            pass
364
365        # ok, we're done with the file
366        wrapped_file.detach()
367
368        # Whether to strip the HTML tags
369        strip_html = False
370        if query.get("strip_html"):
371            strip_html = True
372
373        # return metadata - the filename is sanitised and serves no purpose at
374        # this point in time, but can be used to uniquely identify a dataset
375        disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]")
376        return {
377            "filename": disallowed_characters.sub("", file.filename),
378            "time": time.time(),
379            "datasource": "upload",
380            "board": query.get("format", "custom").replace("_", "-"),
381            "format": query.get("format"),
382            "strip_html": strip_html,
383            **column_mapping,
384        }

Validate custom data input

Confirms that the uploaded file is a valid CSV or tab file and, if so, returns some metadata.

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • ConfigManager|None config: Configuration reader (context-aware)
Returns

Safe query parameters

def after_create(query, dataset, request):
386    def after_create(query, dataset, request):
387        """
388        Hook to execute after the dataset for this source has been created
389
390        In this case, put the file in a temporary location so it can be
391        processed properly by the related Job later.
392
393        :param dict query:  Sanitised query parameters
394        :param DataSet dataset:  Dataset created for this query
395        :param request:  Flask request submitted for its creation
396        """
397        file = request.files["option-data_upload"]
398        file.seek(0)
399        with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
400            while True:
401                chunk = file.read(1024)
402                if len(chunk) == 0:
403                    break
404                outfile.write(chunk)

Hook to execute after the dataset for this source has been created

In this case, put the file in a temporary location so it can be processed properly by the related Job later.

Parameters
  • dict query: Sanitised query parameters
  • DataSet dataset: Dataset created for this query
  • request: Flask request submitted for its creation