Edit on GitHub

datasources.upload.import_csv

Custom data upload to create bespoke datasets

  1"""
  2Custom data upload to create bespoke datasets
  3"""
  4import secrets
  5import hashlib
  6import time
  7import csv
  8import re
  9import io
 10
 11import datasources.upload.import_formats as import_formats
 12
 13from dateutil.parser import parse as parse_datetime
 14from datetime import datetime
 15
 16from backend.lib.processor import BasicProcessor
 17from common.lib.exceptions import QueryParametersException, QueryNeedsFurtherInputException, \
 18    QueryNeedsExplicitConfirmationException, CsvDialectException
 19from common.lib.helpers import strip_tags, sniff_encoding, UserInput, HashCache
 20
 21
 22class SearchCustom(BasicProcessor):
 23    type = "upload-search"  # job ID
 24    category = "Search"  # category
 25    title = "Custom Dataset Upload"  # title displayed in UI
 26    description = "Upload your own CSV file to be used as a dataset"  # description displayed in UI
 27    extension = "csv"  # extension of result file, used internally and in UI
 28    is_local = False  # Whether this datasource is locally scraped
 29    is_static = False  # Whether this datasource is still updated
 30
 31    max_workers = 1
 32    options = {
 33        "intro": {
 34            "type": UserInput.OPTION_INFO,
 35            "help": "You can upload a CSV or TAB file here that, after upload, will be available for further analysis "
 36                    "and processing. Files need to be [UTF-8](https://en.wikipedia.org/wiki/UTF-8)-encoded and must "
 37                    "contain a header row.\n\n"
 38                    "You can indicate what format the file has or upload one with arbitrary structure. In the latter "
 39                    "case, for each item, columns describing its ID, author, timestamp, and content are expected. You "
 40                    "can select which column holds which value after uploading the file."
 41        },
 42        "data_upload": {
 43            "type": UserInput.OPTION_FILE,
 44            "help": "File"
 45        },
 46        "format": {
 47            "type": UserInput.OPTION_CHOICE,
 48            "help": "CSV format",
 49            "options": {
 50                tool: info["name"] for tool, info in import_formats.tools.items()
 51            },
 52            "default": "custom"
 53        },
 54        "strip_html": {
 55            "type": UserInput.OPTION_TOGGLE,
 56            "help": "Strip HTML?",
 57            "default": False,
 58            "tooltip": "Removes HTML tags from the column identified as containing the item content ('body' by default)"
 59        }
 60    }
 61
 62    def process(self):
 63        """
 64        Process uploaded CSV file
 65
 66        Applies the provided mapping and makes sure the file is in a format
 67        4CAT will understand.
 68        """
 69        tool_format = import_formats.tools.get(self.parameters.get("format"))
 70        temp_file = self.dataset.get_results_path().with_suffix(".importing")
 71        with temp_file.open("rb") as infile:
 72            # detect encoding - UTF-8 with or without BOM
 73            encoding = sniff_encoding(infile)
 74
 75        # figure out the csv dialect
 76        # the sniffer is not perfect and sometimes makes mistakes
 77        # for some formats we already know the dialect, so we can override its
 78        # guess and set the properties as defined in import_formats.py
 79        infile = temp_file.open("r", encoding=encoding)
 80        sample = infile.read(1024 * 1024)
 81        try:
 82            possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))]
 83        except csv.Error:
 84            possible_dialects = csv.list_dialects()
 85        if tool_format.get("csv_dialect", {}):
 86            # Known dialects are defined in import_formats.py
 87            dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))
 88            for prop in tool_format.get("csv_dialect", {}):
 89                setattr(dialect, prop, tool_format["csv_dialect"][prop])
 90            possible_dialects.append(dialect)
 91
 92        while possible_dialects:
 93            # With validated csvs, save as is but make sure the raw file is sorted
 94            infile.seek(0)
 95            dialect = possible_dialects.pop() # Use the last dialect first
 96            self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) == csv.Dialect else dialect}")
 97            reader = csv.DictReader(infile, dialect=dialect)
 98
 99            if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \
100                    set(tool_format["columns"]) != set(tool_format["columns"]):
101                raise QueryParametersException("Not all columns are present")
102
103            # hasher for pseudonymisation
104            salt = secrets.token_bytes(16)
105            hasher = hashlib.blake2b(digest_size=24, salt=salt)
106            hash_cache = HashCache(hasher)
107
108            # write the resulting dataset
109            writer = None
110            done = 0
111            skipped = 0
112            with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv:
113                # mapper is defined in import_formats
114                try:
115                    for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)):
116                        if isinstance(item, import_formats.InvalidImportedItem):
117                            # if the mapper returns this class, the item is not written
118                            skipped += 1
119                            if hasattr(item, "reason"):
120                                self.dataset.log(f"Skipping item ({item.reason})")
121                            continue
122
123                        if not writer:
124                            writer = csv.DictWriter(output_csv, fieldnames=list(item.keys()))
125                            writer.writeheader()
126
127                        if self.parameters.get("strip_html") and "body" in item:
128                            item["body"] = strip_tags(item["body"])
129
130                        # pseudonymise or anonymise as needed
131                        filtering = self.parameters.get("pseudonymise")
132                        try:
133                            if filtering:
134                                for field, value in item.items():
135                                    if field is None:
136                                        # This would normally be caught when writerow is called
137                                        raise CsvDialectException("Field is None")
138                                    if field.startswith("author"):
139                                        if filtering == "anonymise":
140                                            item[field] = "REDACTED"
141                                        elif filtering == "pseudonymise":
142                                            item[field] = hash_cache.update_cache(value)
143
144                            writer.writerow(item)
145                        except ValueError as e:
146                            if not possible_dialects:
147                                self.dataset.log(f"Error ({e}) writing item {i}: {item}")
148                                return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct "
149                                                                      "format or edited the CSV after exporting? Try importing "
150                                                                      "as custom format.")
151                            else:
152                                raise CsvDialectException(f"Error ({e}) writing item {i}: {item}")
153
154                        done += 1
155
156                except import_formats.InvalidCustomFormat as e:
157                    self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset "
158                                     "log for details.")
159                    infile.close()
160                    temp_file.unlink()
161                    return self.dataset.finish_with_error(str(e))
162
163                except UnicodeDecodeError as e:
164                    infile.close()
165                    temp_file.unlink()
166                    return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. "
167                                                          "Make sure the file is encoded properly and try again.")
168
169                except CsvDialectException as e:
170                    self.dataset.log(f"Error with CSV dialect: {vars(dialect)}")
171                    continue
172
173            # done!
174            infile.close()
175            # We successfully read the CSV, no need to try other dialects
176            break
177
178        if skipped:
179            self.dataset.update_status(
180                f"CSV file imported, but {skipped:,} items were skipped because their date could not be parsed.",
181                is_final=True)
182
183        temp_file.unlink()
184        self.dataset.delete_parameter("filename")
185        if skipped and not done:
186            self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing "
187                                           "the item's timestamp may be in a format that cannot be parsed properly.")
188        else:
189            self.dataset.finish(done)
190
191    def validate_query(query, request, user):
192        """
193        Validate custom data input
194
195        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
196        some metadata.
197
198        :param dict query:  Query parameters, from client-side.
199        :param request:  Flask request
200        :param User user:  User object of user who has submitted the query
201        :return dict:  Safe query parameters
202        """
203        # do we have an uploaded file?
204        if "option-data_upload" not in request.files:
205            raise QueryParametersException("No file was offered for upload.")
206
207        file = request.files["option-data_upload"]
208        if not file:
209            raise QueryParametersException("No file was offered for upload.")
210
211        if query.get("format") not in import_formats.tools:
212            raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}")
213
214        # content_length seems unreliable, so figure out the length by reading
215        # the file...
216        upload_size = 0
217        while True:
218            bit = file.read(1024)
219            if len(bit) == 0:
220                break
221            upload_size += len(bit)
222
223        file.seek(0)
224        encoding = sniff_encoding(file)
225        tool_format = import_formats.tools.get(query.get("format"))
226
227        try:
228            # try reading the file as csv here
229            # never read more than 128 kB (to keep it quick)
230            sample_size = min(upload_size, 128 * 1024)  # 128 kB is sent from the frontend at most
231            wrapped_file = io.TextIOWrapper(file, encoding=encoding)
232            sample = wrapped_file.read(sample_size)
233
234            if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"):
235                # this may be intended, or the check may be bad, so allow user to continue
236                raise QueryNeedsExplicitConfirmationException(
237                    "The uploaded file does not seem to have a header row. Continue anyway?")
238
239            wrapped_file.seek(0)
240            dialect = csv.Sniffer().sniff(sample, delimiters=",;\t")
241
242            # override the guesses for specific formats if defined so in
243            # import_formats.py
244            for prop in tool_format.get("csv_dialect", {}):
245                setattr(dialect, prop, tool_format["csv_dialect"][prop])
246
247        except UnicodeDecodeError as e:
248            raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. "
249                                           "Save the file in the proper format and try again.")
250        except csv.Error:
251            raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.")
252
253        # With validated csvs, save as is but make sure the raw file is sorted
254        reader = csv.DictReader(wrapped_file, dialect=dialect)
255
256        # we know that the CSV file is a CSV file now, next verify whether
257        # we know what each column means
258        try:
259            fields = reader.fieldnames
260        except UnicodeDecodeError:
261            raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.")
262
263        incomplete_mapping = list(tool_format["columns"])
264        for field in tool_format["columns"]:
265            if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form:
266                incomplete_mapping.remove(field)
267            elif not tool_format.get("allow_user_mapping", False) and field in fields:
268                incomplete_mapping.remove(field)
269
270        # offer the user a number of select boxes where they can indicate the
271        # mapping for each column
272        column_mapping = {}
273        if tool_format.get("allow_user_mapping", False):
274            magic_mappings = {
275                "id": {"__4cat_auto_sequence": "[generate sequential IDs]"},
276                "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"},
277                "empty": {"__4cat_empty_value": "[empty]"},
278                "timestamp": {"__4cat_now": "[current date and time]"}
279            }
280            if incomplete_mapping:
281                raise QueryNeedsFurtherInputException({
282                    "mapping-info": {
283                        "type": UserInput.OPTION_INFO,
284                        "help": "Please confirm which column in the CSV file maps to each required value."
285                    },
286                    **{
287                        "mapping-%s" % mappable_column: {
288                            "type": UserInput.OPTION_CHOICE,
289                            "options": {
290                                "": "",
291                                **magic_mappings.get(mappable_column, magic_mappings["empty"]),
292                                **{column: column for column in fields}
293                            },
294                            "default": mappable_column if mappable_column in fields else "",
295                            "help": mappable_column,
296                            "tooltip": tool_format["columns"][mappable_column]
297                        } for mappable_column in incomplete_mapping
298                    }})
299
300            # the mappings do need to point to a column in the csv file
301            missing_mapping = []
302            for field in tool_format["columns"]:
303                mapping_field = "option-mapping-%s" % field
304                provided_field = request.form.get(mapping_field)
305                if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field:
306                    missing_mapping.append(field)
307                else:
308                    column_mapping["mapping-" + field] = request.form.get(mapping_field)
309
310            if missing_mapping:
311                raise QueryParametersException(
312                    "You need to indicate which column in the CSV file holds the corresponding value for the following "
313                    "columns: %s" % ", ".join(missing_mapping))
314
315        elif incomplete_mapping:
316            raise QueryParametersException("The CSV file does not contain all required columns. The following columns "
317                                           "are missing: %s" % ", ".join(incomplete_mapping))
318
319        # the timestamp column needs to be parseable
320        timestamp_column = request.form.get("mapping-timestamp")
321        try:
322            row = reader.__next__()
323            if timestamp_column not in row:
324                # incomplete row because we are analysing a sample
325                # stop parsing because no complete rows will follow
326                raise StopIteration
327
328            try:
329                if row[timestamp_column].isdecimal():
330                    datetime.fromtimestamp(float(row[timestamp_column]))
331                else:
332                    parse_datetime(row[timestamp_column])
333            except (ValueError, OSError):
334                raise QueryParametersException(
335                    "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)")
336
337        except StopIteration:
338            pass
339
340        # ok, we're done with the file
341        wrapped_file.detach()
342
343        # Whether to strip the HTML tags
344        strip_html = False
345        if query.get("strip_html"):
346            strip_html = True
347
348        # return metadata - the filename is sanitised and serves no purpose at
349        # this point in time, but can be used to uniquely identify a dataset
350        disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]")
351        return {
352            "filename": disallowed_characters.sub("", file.filename),
353            "time": time.time(),
354            "datasource": "upload",
355            "board": query.get("format", "custom").replace("_", "-"),
356            "format": query.get("format"),
357            "strip_html": strip_html,
358            **column_mapping,
359        }
360
361    def after_create(query, dataset, request):
362        """
363        Hook to execute after the dataset for this source has been created
364
365        In this case, put the file in a temporary location so it can be
366        processed properly by the related Job later.
367
368        :param dict query:  Sanitised query parameters
369        :param DataSet dataset:  Dataset created for this query
370        :param request:  Flask request submitted for its creation
371        """
372        file = request.files["option-data_upload"]
373        file.seek(0)
374        with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
375            while True:
376                chunk = file.read(1024)
377                if len(chunk) == 0:
378                    break
379                outfile.write(chunk)
class SearchCustom(backend.lib.processor.BasicProcessor):
 23class SearchCustom(BasicProcessor):
 24    type = "upload-search"  # job ID
 25    category = "Search"  # category
 26    title = "Custom Dataset Upload"  # title displayed in UI
 27    description = "Upload your own CSV file to be used as a dataset"  # description displayed in UI
 28    extension = "csv"  # extension of result file, used internally and in UI
 29    is_local = False  # Whether this datasource is locally scraped
 30    is_static = False  # Whether this datasource is still updated
 31
 32    max_workers = 1
 33    options = {
 34        "intro": {
 35            "type": UserInput.OPTION_INFO,
 36            "help": "You can upload a CSV or TAB file here that, after upload, will be available for further analysis "
 37                    "and processing. Files need to be [UTF-8](https://en.wikipedia.org/wiki/UTF-8)-encoded and must "
 38                    "contain a header row.\n\n"
 39                    "You can indicate what format the file has or upload one with arbitrary structure. In the latter "
 40                    "case, for each item, columns describing its ID, author, timestamp, and content are expected. You "
 41                    "can select which column holds which value after uploading the file."
 42        },
 43        "data_upload": {
 44            "type": UserInput.OPTION_FILE,
 45            "help": "File"
 46        },
 47        "format": {
 48            "type": UserInput.OPTION_CHOICE,
 49            "help": "CSV format",
 50            "options": {
 51                tool: info["name"] for tool, info in import_formats.tools.items()
 52            },
 53            "default": "custom"
 54        },
 55        "strip_html": {
 56            "type": UserInput.OPTION_TOGGLE,
 57            "help": "Strip HTML?",
 58            "default": False,
 59            "tooltip": "Removes HTML tags from the column identified as containing the item content ('body' by default)"
 60        }
 61    }
 62
 63    def process(self):
 64        """
 65        Process uploaded CSV file
 66
 67        Applies the provided mapping and makes sure the file is in a format
 68        4CAT will understand.
 69        """
 70        tool_format = import_formats.tools.get(self.parameters.get("format"))
 71        temp_file = self.dataset.get_results_path().with_suffix(".importing")
 72        with temp_file.open("rb") as infile:
 73            # detect encoding - UTF-8 with or without BOM
 74            encoding = sniff_encoding(infile)
 75
 76        # figure out the csv dialect
 77        # the sniffer is not perfect and sometimes makes mistakes
 78        # for some formats we already know the dialect, so we can override its
 79        # guess and set the properties as defined in import_formats.py
 80        infile = temp_file.open("r", encoding=encoding)
 81        sample = infile.read(1024 * 1024)
 82        try:
 83            possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))]
 84        except csv.Error:
 85            possible_dialects = csv.list_dialects()
 86        if tool_format.get("csv_dialect", {}):
 87            # Known dialects are defined in import_formats.py
 88            dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))
 89            for prop in tool_format.get("csv_dialect", {}):
 90                setattr(dialect, prop, tool_format["csv_dialect"][prop])
 91            possible_dialects.append(dialect)
 92
 93        while possible_dialects:
 94            # With validated csvs, save as is but make sure the raw file is sorted
 95            infile.seek(0)
 96            dialect = possible_dialects.pop() # Use the last dialect first
 97            self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) == csv.Dialect else dialect}")
 98            reader = csv.DictReader(infile, dialect=dialect)
 99
100            if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \
101                    set(tool_format["columns"]) != set(tool_format["columns"]):
102                raise QueryParametersException("Not all columns are present")
103
104            # hasher for pseudonymisation
105            salt = secrets.token_bytes(16)
106            hasher = hashlib.blake2b(digest_size=24, salt=salt)
107            hash_cache = HashCache(hasher)
108
109            # write the resulting dataset
110            writer = None
111            done = 0
112            skipped = 0
113            with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv:
114                # mapper is defined in import_formats
115                try:
116                    for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)):
117                        if isinstance(item, import_formats.InvalidImportedItem):
118                            # if the mapper returns this class, the item is not written
119                            skipped += 1
120                            if hasattr(item, "reason"):
121                                self.dataset.log(f"Skipping item ({item.reason})")
122                            continue
123
124                        if not writer:
125                            writer = csv.DictWriter(output_csv, fieldnames=list(item.keys()))
126                            writer.writeheader()
127
128                        if self.parameters.get("strip_html") and "body" in item:
129                            item["body"] = strip_tags(item["body"])
130
131                        # pseudonymise or anonymise as needed
132                        filtering = self.parameters.get("pseudonymise")
133                        try:
134                            if filtering:
135                                for field, value in item.items():
136                                    if field is None:
137                                        # This would normally be caught when writerow is called
138                                        raise CsvDialectException("Field is None")
139                                    if field.startswith("author"):
140                                        if filtering == "anonymise":
141                                            item[field] = "REDACTED"
142                                        elif filtering == "pseudonymise":
143                                            item[field] = hash_cache.update_cache(value)
144
145                            writer.writerow(item)
146                        except ValueError as e:
147                            if not possible_dialects:
148                                self.dataset.log(f"Error ({e}) writing item {i}: {item}")
149                                return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct "
150                                                                      "format or edited the CSV after exporting? Try importing "
151                                                                      "as custom format.")
152                            else:
153                                raise CsvDialectException(f"Error ({e}) writing item {i}: {item}")
154
155                        done += 1
156
157                except import_formats.InvalidCustomFormat as e:
158                    self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset "
159                                     "log for details.")
160                    infile.close()
161                    temp_file.unlink()
162                    return self.dataset.finish_with_error(str(e))
163
164                except UnicodeDecodeError as e:
165                    infile.close()
166                    temp_file.unlink()
167                    return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. "
168                                                          "Make sure the file is encoded properly and try again.")
169
170                except CsvDialectException as e:
171                    self.dataset.log(f"Error with CSV dialect: {vars(dialect)}")
172                    continue
173
174            # done!
175            infile.close()
176            # We successfully read the CSV, no need to try other dialects
177            break
178
179        if skipped:
180            self.dataset.update_status(
181                f"CSV file imported, but {skipped:,} items were skipped because their date could not be parsed.",
182                is_final=True)
183
184        temp_file.unlink()
185        self.dataset.delete_parameter("filename")
186        if skipped and not done:
187            self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing "
188                                           "the item's timestamp may be in a format that cannot be parsed properly.")
189        else:
190            self.dataset.finish(done)
191
192    def validate_query(query, request, user):
193        """
194        Validate custom data input
195
196        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
197        some metadata.
198
199        :param dict query:  Query parameters, from client-side.
200        :param request:  Flask request
201        :param User user:  User object of user who has submitted the query
202        :return dict:  Safe query parameters
203        """
204        # do we have an uploaded file?
205        if "option-data_upload" not in request.files:
206            raise QueryParametersException("No file was offered for upload.")
207
208        file = request.files["option-data_upload"]
209        if not file:
210            raise QueryParametersException("No file was offered for upload.")
211
212        if query.get("format") not in import_formats.tools:
213            raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}")
214
215        # content_length seems unreliable, so figure out the length by reading
216        # the file...
217        upload_size = 0
218        while True:
219            bit = file.read(1024)
220            if len(bit) == 0:
221                break
222            upload_size += len(bit)
223
224        file.seek(0)
225        encoding = sniff_encoding(file)
226        tool_format = import_formats.tools.get(query.get("format"))
227
228        try:
229            # try reading the file as csv here
230            # never read more than 128 kB (to keep it quick)
231            sample_size = min(upload_size, 128 * 1024)  # 128 kB is sent from the frontend at most
232            wrapped_file = io.TextIOWrapper(file, encoding=encoding)
233            sample = wrapped_file.read(sample_size)
234
235            if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"):
236                # this may be intended, or the check may be bad, so allow user to continue
237                raise QueryNeedsExplicitConfirmationException(
238                    "The uploaded file does not seem to have a header row. Continue anyway?")
239
240            wrapped_file.seek(0)
241            dialect = csv.Sniffer().sniff(sample, delimiters=",;\t")
242
243            # override the guesses for specific formats if defined so in
244            # import_formats.py
245            for prop in tool_format.get("csv_dialect", {}):
246                setattr(dialect, prop, tool_format["csv_dialect"][prop])
247
248        except UnicodeDecodeError as e:
249            raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. "
250                                           "Save the file in the proper format and try again.")
251        except csv.Error:
252            raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.")
253
254        # With validated csvs, save as is but make sure the raw file is sorted
255        reader = csv.DictReader(wrapped_file, dialect=dialect)
256
257        # we know that the CSV file is a CSV file now, next verify whether
258        # we know what each column means
259        try:
260            fields = reader.fieldnames
261        except UnicodeDecodeError:
262            raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.")
263
264        incomplete_mapping = list(tool_format["columns"])
265        for field in tool_format["columns"]:
266            if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form:
267                incomplete_mapping.remove(field)
268            elif not tool_format.get("allow_user_mapping", False) and field in fields:
269                incomplete_mapping.remove(field)
270
271        # offer the user a number of select boxes where they can indicate the
272        # mapping for each column
273        column_mapping = {}
274        if tool_format.get("allow_user_mapping", False):
275            magic_mappings = {
276                "id": {"__4cat_auto_sequence": "[generate sequential IDs]"},
277                "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"},
278                "empty": {"__4cat_empty_value": "[empty]"},
279                "timestamp": {"__4cat_now": "[current date and time]"}
280            }
281            if incomplete_mapping:
282                raise QueryNeedsFurtherInputException({
283                    "mapping-info": {
284                        "type": UserInput.OPTION_INFO,
285                        "help": "Please confirm which column in the CSV file maps to each required value."
286                    },
287                    **{
288                        "mapping-%s" % mappable_column: {
289                            "type": UserInput.OPTION_CHOICE,
290                            "options": {
291                                "": "",
292                                **magic_mappings.get(mappable_column, magic_mappings["empty"]),
293                                **{column: column for column in fields}
294                            },
295                            "default": mappable_column if mappable_column in fields else "",
296                            "help": mappable_column,
297                            "tooltip": tool_format["columns"][mappable_column]
298                        } for mappable_column in incomplete_mapping
299                    }})
300
301            # the mappings do need to point to a column in the csv file
302            missing_mapping = []
303            for field in tool_format["columns"]:
304                mapping_field = "option-mapping-%s" % field
305                provided_field = request.form.get(mapping_field)
306                if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field:
307                    missing_mapping.append(field)
308                else:
309                    column_mapping["mapping-" + field] = request.form.get(mapping_field)
310
311            if missing_mapping:
312                raise QueryParametersException(
313                    "You need to indicate which column in the CSV file holds the corresponding value for the following "
314                    "columns: %s" % ", ".join(missing_mapping))
315
316        elif incomplete_mapping:
317            raise QueryParametersException("The CSV file does not contain all required columns. The following columns "
318                                           "are missing: %s" % ", ".join(incomplete_mapping))
319
320        # the timestamp column needs to be parseable
321        timestamp_column = request.form.get("mapping-timestamp")
322        try:
323            row = reader.__next__()
324            if timestamp_column not in row:
325                # incomplete row because we are analysing a sample
326                # stop parsing because no complete rows will follow
327                raise StopIteration
328
329            try:
330                if row[timestamp_column].isdecimal():
331                    datetime.fromtimestamp(float(row[timestamp_column]))
332                else:
333                    parse_datetime(row[timestamp_column])
334            except (ValueError, OSError):
335                raise QueryParametersException(
336                    "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)")
337
338        except StopIteration:
339            pass
340
341        # ok, we're done with the file
342        wrapped_file.detach()
343
344        # Whether to strip the HTML tags
345        strip_html = False
346        if query.get("strip_html"):
347            strip_html = True
348
349        # return metadata - the filename is sanitised and serves no purpose at
350        # this point in time, but can be used to uniquely identify a dataset
351        disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]")
352        return {
353            "filename": disallowed_characters.sub("", file.filename),
354            "time": time.time(),
355            "datasource": "upload",
356            "board": query.get("format", "custom").replace("_", "-"),
357            "format": query.get("format"),
358            "strip_html": strip_html,
359            **column_mapping,
360        }
361
362    def after_create(query, dataset, request):
363        """
364        Hook to execute after the dataset for this source has been created
365
366        In this case, put the file in a temporary location so it can be
367        processed properly by the related Job later.
368
369        :param dict query:  Sanitised query parameters
370        :param DataSet dataset:  Dataset created for this query
371        :param request:  Flask request submitted for its creation
372        """
373        file = request.files["option-data_upload"]
374        file.seek(0)
375        with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
376            while True:
377                chunk = file.read(1024)
378                if len(chunk) == 0:
379                    break
380                outfile.write(chunk)

Abstract processor class

A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).

To determine whether a processor can process a given dataset, you can define a is_compatible_with(FourcatModule module=None, str user=None):) -> bool class method which takes a dataset as argument and returns a bool that determines if this processor is considered compatible with that dataset. For example:


@classmethod def is_compatible_with(cls, module=None, user=None): return module.type == "linguistic-features"

type = 'upload-search'
category = 'Search'
title = 'Custom Dataset Upload'
description = 'Upload your own CSV file to be used as a dataset'
extension = 'csv'
is_local = False
is_static = False
max_workers = 1
options = {'intro': {'type': 'info', 'help': 'You can upload a CSV or TAB file here that, after upload, will be available for further analysis and processing. Files need to be [UTF-8](https://en.wikipedia.org/wiki/UTF-8)-encoded and must contain a header row.\n\nYou can indicate what format the file has or upload one with arbitrary structure. In the latter case, for each item, columns describing its ID, author, timestamp, and content are expected. You can select which column holds which value after uploading the file.'}, 'data_upload': {'type': 'file', 'help': 'File'}, 'format': {'type': 'choice', 'help': 'CSV format', 'options': {'instagram-crowdtangle': 'Instagram (via CrowdTangle export)', 'facebook-crowdtangle': 'Facebook (via CrowdTangle export)', 'facepager': 'Facebook (via Facepager export)', 'youtube_video_list': "YouTube videos (via YouTube Data Tools' Video List module)", 'youtube_comment_list': "YouTube comments (via YouTube Data Tools' Video Info module)", 'bazhuayu_weibo': 'Sina Weibo (via Bazhuayu)', 'custom': 'Custom/other'}, 'default': 'custom'}, 'strip_html': {'type': 'toggle', 'help': 'Strip HTML?', 'default': False, 'tooltip': "Removes HTML tags from the column identified as containing the item content ('body' by default)"}}
def process(self):
 63    def process(self):
 64        """
 65        Process uploaded CSV file
 66
 67        Applies the provided mapping and makes sure the file is in a format
 68        4CAT will understand.
 69        """
 70        tool_format = import_formats.tools.get(self.parameters.get("format"))
 71        temp_file = self.dataset.get_results_path().with_suffix(".importing")
 72        with temp_file.open("rb") as infile:
 73            # detect encoding - UTF-8 with or without BOM
 74            encoding = sniff_encoding(infile)
 75
 76        # figure out the csv dialect
 77        # the sniffer is not perfect and sometimes makes mistakes
 78        # for some formats we already know the dialect, so we can override its
 79        # guess and set the properties as defined in import_formats.py
 80        infile = temp_file.open("r", encoding=encoding)
 81        sample = infile.read(1024 * 1024)
 82        try:
 83            possible_dialects = [csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))]
 84        except csv.Error:
 85            possible_dialects = csv.list_dialects()
 86        if tool_format.get("csv_dialect", {}):
 87            # Known dialects are defined in import_formats.py
 88            dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))
 89            for prop in tool_format.get("csv_dialect", {}):
 90                setattr(dialect, prop, tool_format["csv_dialect"][prop])
 91            possible_dialects.append(dialect)
 92
 93        while possible_dialects:
 94            # With validated csvs, save as is but make sure the raw file is sorted
 95            infile.seek(0)
 96            dialect = possible_dialects.pop() # Use the last dialect first
 97            self.dataset.log(f"Importing CSV file with dialect: {vars(dialect) if type(dialect) == csv.Dialect else dialect}")
 98            reader = csv.DictReader(infile, dialect=dialect)
 99
100            if tool_format.get("columns") and not tool_format.get("allow_user_mapping") and set(reader.fieldnames) & \
101                    set(tool_format["columns"]) != set(tool_format["columns"]):
102                raise QueryParametersException("Not all columns are present")
103
104            # hasher for pseudonymisation
105            salt = secrets.token_bytes(16)
106            hasher = hashlib.blake2b(digest_size=24, salt=salt)
107            hash_cache = HashCache(hasher)
108
109            # write the resulting dataset
110            writer = None
111            done = 0
112            skipped = 0
113            with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as output_csv:
114                # mapper is defined in import_formats
115                try:
116                    for i, item in enumerate(tool_format["mapper"](reader, tool_format["columns"], self.dataset, self.parameters)):
117                        if isinstance(item, import_formats.InvalidImportedItem):
118                            # if the mapper returns this class, the item is not written
119                            skipped += 1
120                            if hasattr(item, "reason"):
121                                self.dataset.log(f"Skipping item ({item.reason})")
122                            continue
123
124                        if not writer:
125                            writer = csv.DictWriter(output_csv, fieldnames=list(item.keys()))
126                            writer.writeheader()
127
128                        if self.parameters.get("strip_html") and "body" in item:
129                            item["body"] = strip_tags(item["body"])
130
131                        # pseudonymise or anonymise as needed
132                        filtering = self.parameters.get("pseudonymise")
133                        try:
134                            if filtering:
135                                for field, value in item.items():
136                                    if field is None:
137                                        # This would normally be caught when writerow is called
138                                        raise CsvDialectException("Field is None")
139                                    if field.startswith("author"):
140                                        if filtering == "anonymise":
141                                            item[field] = "REDACTED"
142                                        elif filtering == "pseudonymise":
143                                            item[field] = hash_cache.update_cache(value)
144
145                            writer.writerow(item)
146                        except ValueError as e:
147                            if not possible_dialects:
148                                self.dataset.log(f"Error ({e}) writing item {i}: {item}")
149                                return self.dataset.finish_with_error("Could not parse CSV file. Have you selected the correct "
150                                                                      "format or edited the CSV after exporting? Try importing "
151                                                                      "as custom format.")
152                            else:
153                                raise CsvDialectException(f"Error ({e}) writing item {i}: {item}")
154
155                        done += 1
156
157                except import_formats.InvalidCustomFormat as e:
158                    self.log.warning(f"Unable to import improperly formatted file for {tool_format['name']}. See dataset "
159                                     "log for details.")
160                    infile.close()
161                    temp_file.unlink()
162                    return self.dataset.finish_with_error(str(e))
163
164                except UnicodeDecodeError as e:
165                    infile.close()
166                    temp_file.unlink()
167                    return self.dataset.finish_with_error("The uploaded file is not encoded with the UTF-8 character set. "
168                                                          "Make sure the file is encoded properly and try again.")
169
170                except CsvDialectException as e:
171                    self.dataset.log(f"Error with CSV dialect: {vars(dialect)}")
172                    continue
173
174            # done!
175            infile.close()
176            # We successfully read the CSV, no need to try other dialects
177            break
178
179        if skipped:
180            self.dataset.update_status(
181                f"CSV file imported, but {skipped:,} items were skipped because their date could not be parsed.",
182                is_final=True)
183
184        temp_file.unlink()
185        self.dataset.delete_parameter("filename")
186        if skipped and not done:
187            self.dataset.finish_with_error("No valid items could be found in the uploaded file. The column containing "
188                                           "the item's timestamp may be in a format that cannot be parsed properly.")
189        else:
190            self.dataset.finish(done)

Process uploaded CSV file

Applies the provided mapping and makes sure the file is in a format 4CAT will understand.

def validate_query(query, request, user):
192    def validate_query(query, request, user):
193        """
194        Validate custom data input
195
196        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
197        some metadata.
198
199        :param dict query:  Query parameters, from client-side.
200        :param request:  Flask request
201        :param User user:  User object of user who has submitted the query
202        :return dict:  Safe query parameters
203        """
204        # do we have an uploaded file?
205        if "option-data_upload" not in request.files:
206            raise QueryParametersException("No file was offered for upload.")
207
208        file = request.files["option-data_upload"]
209        if not file:
210            raise QueryParametersException("No file was offered for upload.")
211
212        if query.get("format") not in import_formats.tools:
213            raise QueryParametersException(f"Cannot import CSV from tool {query.get('format')}")
214
215        # content_length seems unreliable, so figure out the length by reading
216        # the file...
217        upload_size = 0
218        while True:
219            bit = file.read(1024)
220            if len(bit) == 0:
221                break
222            upload_size += len(bit)
223
224        file.seek(0)
225        encoding = sniff_encoding(file)
226        tool_format = import_formats.tools.get(query.get("format"))
227
228        try:
229            # try reading the file as csv here
230            # never read more than 128 kB (to keep it quick)
231            sample_size = min(upload_size, 128 * 1024)  # 128 kB is sent from the frontend at most
232            wrapped_file = io.TextIOWrapper(file, encoding=encoding)
233            sample = wrapped_file.read(sample_size)
234
235            if not csv.Sniffer().has_header(sample) and not query.get("frontend-confirm"):
236                # this may be intended, or the check may be bad, so allow user to continue
237                raise QueryNeedsExplicitConfirmationException(
238                    "The uploaded file does not seem to have a header row. Continue anyway?")
239
240            wrapped_file.seek(0)
241            dialect = csv.Sniffer().sniff(sample, delimiters=",;\t")
242
243            # override the guesses for specific formats if defined so in
244            # import_formats.py
245            for prop in tool_format.get("csv_dialect", {}):
246                setattr(dialect, prop, tool_format["csv_dialect"][prop])
247
248        except UnicodeDecodeError as e:
249            raise QueryParametersException("The uploaded file does not seem to be a CSV file encoded with UTF-8. "
250                                           "Save the file in the proper format and try again.")
251        except csv.Error:
252            raise QueryParametersException("Uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.")
253
254        # With validated csvs, save as is but make sure the raw file is sorted
255        reader = csv.DictReader(wrapped_file, dialect=dialect)
256
257        # we know that the CSV file is a CSV file now, next verify whether
258        # we know what each column means
259        try:
260            fields = reader.fieldnames
261        except UnicodeDecodeError:
262            raise QueryParametersException("The uploaded file is not a well-formed, UTF 8-encoded CSV or TAB file.")
263
264        incomplete_mapping = list(tool_format["columns"])
265        for field in tool_format["columns"]:
266            if tool_format.get("allow_user_mapping", False) and "option-mapping-%s" % field in request.form:
267                incomplete_mapping.remove(field)
268            elif not tool_format.get("allow_user_mapping", False) and field in fields:
269                incomplete_mapping.remove(field)
270
271        # offer the user a number of select boxes where they can indicate the
272        # mapping for each column
273        column_mapping = {}
274        if tool_format.get("allow_user_mapping", False):
275            magic_mappings = {
276                "id": {"__4cat_auto_sequence": "[generate sequential IDs]"},
277                "thread_id": {"__4cat_auto_sequence": "[generate sequential IDs]"},
278                "empty": {"__4cat_empty_value": "[empty]"},
279                "timestamp": {"__4cat_now": "[current date and time]"}
280            }
281            if incomplete_mapping:
282                raise QueryNeedsFurtherInputException({
283                    "mapping-info": {
284                        "type": UserInput.OPTION_INFO,
285                        "help": "Please confirm which column in the CSV file maps to each required value."
286                    },
287                    **{
288                        "mapping-%s" % mappable_column: {
289                            "type": UserInput.OPTION_CHOICE,
290                            "options": {
291                                "": "",
292                                **magic_mappings.get(mappable_column, magic_mappings["empty"]),
293                                **{column: column for column in fields}
294                            },
295                            "default": mappable_column if mappable_column in fields else "",
296                            "help": mappable_column,
297                            "tooltip": tool_format["columns"][mappable_column]
298                        } for mappable_column in incomplete_mapping
299                    }})
300
301            # the mappings do need to point to a column in the csv file
302            missing_mapping = []
303            for field in tool_format["columns"]:
304                mapping_field = "option-mapping-%s" % field
305                provided_field = request.form.get(mapping_field)
306                if (provided_field not in fields and not provided_field.startswith("__4cat")) or not provided_field:
307                    missing_mapping.append(field)
308                else:
309                    column_mapping["mapping-" + field] = request.form.get(mapping_field)
310
311            if missing_mapping:
312                raise QueryParametersException(
313                    "You need to indicate which column in the CSV file holds the corresponding value for the following "
314                    "columns: %s" % ", ".join(missing_mapping))
315
316        elif incomplete_mapping:
317            raise QueryParametersException("The CSV file does not contain all required columns. The following columns "
318                                           "are missing: %s" % ", ".join(incomplete_mapping))
319
320        # the timestamp column needs to be parseable
321        timestamp_column = request.form.get("mapping-timestamp")
322        try:
323            row = reader.__next__()
324            if timestamp_column not in row:
325                # incomplete row because we are analysing a sample
326                # stop parsing because no complete rows will follow
327                raise StopIteration
328
329            try:
330                if row[timestamp_column].isdecimal():
331                    datetime.fromtimestamp(float(row[timestamp_column]))
332                else:
333                    parse_datetime(row[timestamp_column])
334            except (ValueError, OSError):
335                raise QueryParametersException(
336                    "Your 'timestamp' column does not use a recognisable format (yyyy-mm-dd hh:mm:ss is recommended)")
337
338        except StopIteration:
339            pass
340
341        # ok, we're done with the file
342        wrapped_file.detach()
343
344        # Whether to strip the HTML tags
345        strip_html = False
346        if query.get("strip_html"):
347            strip_html = True
348
349        # return metadata - the filename is sanitised and serves no purpose at
350        # this point in time, but can be used to uniquely identify a dataset
351        disallowed_characters = re.compile(r"[^a-zA-Z0-9._+-]")
352        return {
353            "filename": disallowed_characters.sub("", file.filename),
354            "time": time.time(),
355            "datasource": "upload",
356            "board": query.get("format", "custom").replace("_", "-"),
357            "format": query.get("format"),
358            "strip_html": strip_html,
359            **column_mapping,
360        }

Validate custom data input

Confirms that the uploaded file is a valid CSV or tab file and, if so, returns some metadata.

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • User user: User object of user who has submitted the query
Returns

Safe query parameters

def after_create(query, dataset, request):
362    def after_create(query, dataset, request):
363        """
364        Hook to execute after the dataset for this source has been created
365
366        In this case, put the file in a temporary location so it can be
367        processed properly by the related Job later.
368
369        :param dict query:  Sanitised query parameters
370        :param DataSet dataset:  Dataset created for this query
371        :param request:  Flask request submitted for its creation
372        """
373        file = request.files["option-data_upload"]
374        file.seek(0)
375        with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
376            while True:
377                chunk = file.read(1024)
378                if len(chunk) == 0:
379                    break
380                outfile.write(chunk)

Hook to execute after the dataset for this source has been created

In this case, put the file in a temporary location so it can be processed properly by the related Job later.

Parameters
  • dict query: Sanitised query parameters
  • DataSet dataset: Dataset created for this query
  • request: Flask request submitted for its creation