Edit on GitHub

datasources.upload.import_formats

  1import datetime
  2import json
  3import re
  4
  5from dateutil.parser import parse as parse_datetime
  6from common.lib.exceptions import ProcessorException
  7from common.lib.helpers import convert_to_int
  8
  9
 10class InvalidCustomFormat(ProcessorException):
 11    """
 12    Raise if processor throws an exception
 13    """
 14    pass
 15
 16
 17class InvalidImportedItem:
 18    """
 19    Generic data class to pass to have the importer recognise an item as
 20    one that should not be written to the result CSV file
 21    """
 22    reason = ""
 23
 24    def __init__(self, reason=""):
 25        self.reason = reason
 26
 27
 28def import_crowdtangle_instagram(reader, columns, dataset, parameters):
 29    """
 30    Import an export of a CrowdTangle Instagram list
 31
 32    :param csv.DictReader reader:  Reader object of input file
 33    :param Iterable columns:  Required columns
 34    :param DataSet dataset:  Dataset to import into
 35    :param dict parameters:  Dataset parameters
 36    :return tuple:  Items written, items skipped
 37    """
 38    # write to the result file
 39    hashtag = re.compile(r"#([^\s,.+=-]+)")
 40    usertag = re.compile(r"@([^\s,.+=-]+)")
 41    for item in reader:
 42        url = item["URL"]
 43        url = re.sub(r"/*$", "", url)
 44
 45        post_id = url.split("/")[-1]
 46        caption = item["Description"]
 47        hashtags = hashtag.findall(caption)
 48        usertags = usertag.findall(caption)
 49
 50        datestamp = " ".join(item["Post Created"].split(" ")[:-1])
 51        date = datetime.datetime.strptime(datestamp, "%Y-%m-%d %H:%M:%S")
 52
 53        item = {
 54            "id": post_id,
 55            "thread_id": post_id,
 56            "parent_id": post_id,
 57            "body": caption if caption is not None else "",
 58            "author": item["User Name"],
 59            "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'),
 60            "unix_timestamp": int(date.timestamp()),
 61            "type": "picture" if item["Type"] == "Photo" else item["Type"].lower(),
 62            "url": item["URL"],
 63            "thumbnail_url": item["Photo"],
 64            "hashtags": ",".join(hashtags),
 65            "usertags": ",".join(usertags),
 66            "mentioned": "",
 67            "num_likes": item["Likes"],
 68            "num_comments": item["Comments"],
 69            "subject": item["Title"]
 70        }
 71
 72        yield item
 73
 74
 75def import_crowdtangle_facebook(reader, columns, dataset, parameters):
 76    """
 77    Import an export of a CrowdTangle Facebook list
 78
 79    :param csv.DictReader reader:  Reader object of input file
 80    :param Iterable columns:  Required columns
 81    :param DataSet dataset:  Dataset to import into
 82    :param dict parameters:  Dataset parameters
 83    :return tuple:  Items written, items skipped
 84    """
 85    # write to the result file
 86    hashtag = re.compile(r"#([^\s,.+=-]+)")
 87    entity_name = "Page Name" if "Page Name" in reader.fieldnames else "Group Name"
 88    overperforming_column = None
 89    for item in reader:
 90        hashtags = hashtag.findall(item["Message"])
 91        try:
 92            date = datetime.datetime.strptime(" ".join(item["Post Created"].split(" ")[:2]), "%Y-%m-%d %H:%M:%S")
 93        except ValueError:
 94            yield InvalidImportedItem(reason=f"Cannot parse date/time '{item['Post Created']}'; skipping post")
 95
 96        is_from_elsewhere = item["Link"].find("https://www.facebook.com/" + item["User Name"]) < 0
 97        shared_page = item["Link"].split("/")[3] if is_from_elsewhere and item["Link"].find(
 98            "https://www.facebook.com/") == 0 else ""
 99
100        # this one is a handful
101        # unicode in csv column names is no fun
102        if not overperforming_column:
103            overperforming_column = [c for c in item.keys() if "Overperforming" in c][0]
104
105        overperforming = item.get(overperforming_column, "")
106
107        item = {
108            "id": item["URL"].split("/")[-1],
109            "thread_id": item["URL"].split("/")[-1],
110            "body": item["Message"],
111            "author": item["User Name"],
112            "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'),
113            "unix_timestamp": int(date.timestamp()),
114            "page_name": item[entity_name],
115            "page_category": item["Page Category"],
116            "page_top_country": item["Page Admin Top Country"],
117            "page_description": item["Page Description"],
118            "page_created": item["Page Created"],
119            "page_likes": item["Likes at Posting"],
120            "page_id": item["Facebook Id"],
121            "page_followers": item["Followers at Posting"],
122            "page_shared_from": shared_page,
123            "type": item["Type"],
124            "interactions": convert_to_int(re.sub(r"[^0-9]", "", item["Total Interactions"]), 0) if item[
125                "Total Interactions"] else 0,
126            "comments": item["Comments"],
127            "shares": item["Shares"],
128            "likes": item["Likes"],
129            "likes_love": item["Love"],
130            "likes_wow": item["Wow"],
131            "likes_haha": item["Haha"],
132            "likes_sad": item["Sad"],
133            "likes_angry": item["Angry"],
134            "likes_care": item["Care"],
135            "views_post": item["Post Views"],
136            "views_total": item["Total Views"],
137            "views_total_crossposts": item["Total Views For All Crossposts"],
138            "overperforming_score": overperforming,
139            "video_length": "" if item["Video Length"] == "N/A" else item["Video Length"],
140            "video_status": item["Video Share Status"],
141            "video_own": "yes" if item["Is Video Owner?"] == "Yes" else "no",
142            "url": item["URL"],
143            "hashtags": ",".join(hashtags),
144            "url_original": item["Final Link"] if item["Final Link"] else item["Link"],
145            "body_image": item["Image Text"],
146            "body_link": item["Link Text"],
147            "body_description": item["Description"],
148            "sponsor_id": item["Sponsor Id"],
149            "sponsor_name": item["Sponsor Name"],
150            "sponsor_category": item["Sponsor Category"]
151        }
152
153        yield item
154
155
156def import_facepager(reader, columns, dataset, parameters):
157    """
158    Import an export of a Facepager export
159
160    :param csv.DictReader reader:  Reader object of input file
161    :param Iterable columns:  Required columns
162    :param DataSet dataset:  Dataset to import into
163    :param dict parameters:  Dataset parameters
164    :return tuple:  Items written, items skipped
165    """
166    # write to the result file
167    for item in reader:
168        hashtags = json.loads(item["hashtags"])
169        hashtags = [hashtag["name"] for hashtag in hashtags]
170
171        item = {
172            "id": item["id"],
173            "thread_id": item["id"],
174            "author": item["authorMeta.name"],
175            "body": item["text"],
176            "timestamp": datetime.datetime.utcfromtimestamp(int(item["createTime"])).strftime(
177                '%Y-%m-%d %H:%M:%S'),
178            "unix_timestamp": int(item["createTime"]),
179            "is_harmful": -1,
180            "is_duet": -1,
181            "music_name": item["musicMeta.musicName"],
182            "music_id": item["musicMeta.musicId"],
183            "music_author": item["musicMeta.musicAuthor"],
184            "video_url": item["videoUrl"],
185            "tiktok_url": "https://tiktok.com/@%s/video/%s" % (item["authorMeta.id"], item["id"]),
186            "thumbnail_url": item["covers.default"],
187            "amount_likes": item["diggCount"],
188            "amount_comments": item["commentCount"],
189            "amount_shares": item["shareCount"],
190            "amount_plays": item["playCount"],
191            "hashtags": ",".join(hashtags)
192        }
193
194        yield item
195
196
197def import_ytdt_videolist(reader, columns, dataset, parameters):
198    """
199    Import an export of a YouTube Data Tools Video List export
200
201    :param csv.DictReader reader:  Reader object of input file
202    :param Iterable columns:  Required columns
203    :param DataSet dataset:  Dataset to import into
204    :param dict parameters:  Dataset parameters
205    :return tuple:  Items written, items skipped
206    """
207    # write to the result file
208    for item in reader:
209        try:
210            date = datetime.datetime.strptime(item["publishedAt"], "%Y-%m-%dT%H:%M:%SZ")  # ex. 2022-11-11T05:30:01Z
211        except ValueError:
212            yield InvalidImportedItem(reason=f"Invalid date ({item['publishedAt']})")
213            continue
214
215        collection_date = "_".join(dataset.parameters.get("filename").split("_")[2:]).replace(".csv", "")
216
217        item = {
218            "id": item.get('videoId'),
219            "thread_id": item.get('channelId'),
220            "author": item.get('channelTitle'),
221            "body": item.get('videoDescription'),
222            "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'),
223            "unix_timestamp": int(date.timestamp()),
224            **item,
225            "source_filename": dataset.parameters.get("filename"),
226            "date_collected": collection_date,
227            "youtube_url": f"https://www.youtube.com/watch?v={item['videoId']}"
228        }
229
230        yield item
231
232
233def import_ytdt_commentlist(reader, columns, dataset, parameters):
234    """
235    Import an export of a YouTube Data Tools Video Info export
236
237    :param csv.DictReader reader:  Reader object of input file
238    :param Iterable columns:  Required columns
239    :param DataSet dataset:  Dataset to import into
240    :param dict parameters:  Dataset parameters
241    :return tuple:  Items written, items skipped
242    """
243    # write to the result file
244    for item in reader:
245        try:
246            date = datetime.datetime.strptime(item["publishedAt"], "%Y-%m-%d %H:%M:%S")  # ex. 2022-11-11 05:30:01
247        except ValueError:
248            yield InvalidImportedItem(reason=f"Invalid date ({item['publishedAt']})")
249            continue
250
251        collection_date = "_".join(dataset.parameters.get("filename").split("_")[2:]).replace(".csv", "")
252
253        item = {
254            "id": item["id"],
255            "thread_id": item["isReplyTo"] if item["isReplyTo"] else item["id"],
256            "author": item["authorName"],
257            "body": item["text"],
258            "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'),
259            "unix_timestamp": int(date.timestamp()),
260            **item,
261            "source_filename": dataset.parameters.get("filename"),
262            "date_collected": collection_date,
263        }
264
265        yield item
266
267
268def import_bzy_weibo(reader, columns, dataset, parameter):
269    """
270    Import Weibo item collected by Bazhuayu
271
272    :param csv.DictReader reader:  Reader object of input file
273    :param Iterable columns:  Required columns
274    :param DataSet dataset:  Dataset to import into
275    :param dict parameters:  Dataset parameters
276    :return tuple:  Items written, items skipped
277    """
278    index = 1
279    year = datetime.datetime.now().year
280
281    for item in reader:
282        if "from1" not in item:
283            raise InvalidCustomFormat("CSV does not appear to be Bazhuayu format for Sina Weibo; please try importing again with CSV format set to \"Custom/other\".")
284        raw_timestamp = item["from1"].strip()
285        timestamp_bits = re.split(r"[年月日\s:]+", raw_timestamp)
286
287        if re.match(r"[0-9]{2}月[0-9]{2}日 [0-9]{2}:[0-9]{2}", raw_timestamp):
288            timestamp = datetime.datetime(year, int(timestamp_bits[0]), int(timestamp_bits[1]), int(timestamp_bits[2]),
289                                          int(timestamp_bits[3]))
290        elif re.match(r"[0-9]{4}[0-9]{2}月[0-9]{2}日 [0-9]{2}:[0-9]{2}", raw_timestamp):
291
292            timestamp = datetime.datetime(int(timestamp_bits[0]), int(timestamp_bits[1]), int(timestamp_bits[2]),
293                                          int(timestamp_bits[3]), int(timestamp_bits[4]))
294        else:
295            yield InvalidImportedItem(f"Cannot parse timestamp {raw_timestamp}")
296
297        item = {
298            "id": index,
299            "thread_id": index,
300            "author": item["标题"],
301            "body": item["txt"],
302            "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"),
303            "image_url": item["图片"],
304            **item,
305            "unix_timestamp": int(timestamp.timestamp())
306        }
307
308        index += 1
309        yield item
310
311
312def map_csv_items(reader, columns, dataset, parameters):
313    """
314    Read CSV items and put them in the 4CAT dataset file
315
316    This version of the method mostly just copies the file, applying the
317    supplied mapping where possible. It could alternatively apply more
318    fancy mappings.
319
320    :param csv.DictReader reader:  Reader object of input file
321    :param Iterable columns:  Required columns
322    :param DataSet dataset:  Dataset to import into
323    :param dict parameters:  Dataset parameters
324    :return tuple:  Items written, items skipped
325    """
326    # write to the result file
327    indexes = {}
328    now_timestmap = str(int(datetime.datetime.now().timestamp()))
329    for row in reader:
330        mapped_row = {}
331        for field in columns:
332            mapping = parameters.get("mapping-" + field)
333            if mapping:
334                if mapping == "__4cat_auto_sequence":
335                    # auto-numbering
336                    if field not in indexes:
337                        indexes[field] = 1
338                    mapped_row[field] = indexes[field]
339                    indexes[field] += 1
340                elif mapping == "__4cat_empty_value":
341                    mapped_row[field] = ""
342                elif mapping == "__4cat_now":
343                    mapped_row[field] = now_timestmap
344                else:
345                    # actual mapping
346                    mapped_row[field] = row[mapping]
347
348        # ensure that timestamp is YYYY-MM-DD HH:MM:SS and that there
349        # is a unix timestamp. this will override the columns if they
350        # already exist! but it is necessary for 4CAT to handle the
351        # data in processors etc and should be an equivalent value.
352        if not mapped_row.get("timestamp"):
353            if mapped_row.get("unix_timestamp"):
354                # if unix timestamp is given, convert to datetime
355                try:
356                    timestamp = datetime.datetime.fromtimestamp(int(mapped_row["unix_timestamp"]))
357                    mapped_row["timestamp"] = timestamp.strftime("%Y-%m-%d %H:%M:%S")
358                except (ValueError, OSError) as e:
359                    yield InvalidImportedItem(f"{e.__class__.__name__} - {e} (value was '{mapped_row['unix_timestamp']}')")
360                    continue
361
362            # no timestamp given, set to empty string
363            mapped_row["timestamp"] = ""
364            mapped_row["unix_timestamp"] = None
365            
366        else:
367            try:
368                
369                if mapped_row["timestamp"].replace(".", "").isdecimal() and mapped_row["timestamp"].count(".") <= 1:  # ignore . for floats
370                    timestamp = datetime.datetime.fromtimestamp(float(mapped_row["timestamp"]))
371                else:
372                    timestamp = parse_datetime(mapped_row["timestamp"])
373
374                mapped_row["timestamp"] = timestamp.strftime("%Y-%m-%d %H:%M:%S")
375                mapped_row["unix_timestamp"] = int(timestamp.timestamp())                
376
377            except (ValueError, OSError, AttributeError) as e:
378                # skip rows without a valid timestamp - this may happen
379                # despite validation because only a sample is validated
380                # this is an OSError on Windows sometimes???
381                yield InvalidImportedItem(f"{e.__class__.__name__} - {e} (value was '{mapped_row['timestamp']}')")
382                continue
383        
384        # this ensures that the required columns are always the first
385        # columns, and the rest is in original order
386        for field, value in row.items():
387            if field not in mapped_row and field:
388                mapped_row[field] = value
389
390        yield mapped_row
391
392
393# tools that are supported for importing
394# defined here (instead of at the top) so we can refer to the functions
395# defined above
396# format: dictionary with keys name, columns, mapper
397# name is a human-readable name for this format (e.g. a tool name)
398# columns is a set of required columns in the uploaded csv
399# mapper is a function that writes the 4CAT-compatible CSV
400tools = {
401    "instagram-crowdtangle": {
402        "name": "Instagram (via CrowdTangle export)",
403        "columns": {"Account", "User Name", "Followers at Posting", "Post Created", "Type", "Likes", "Comments",
404                    "Views", "URL", "Link", "Photo", "Title", "Description"},
405        "mapper": import_crowdtangle_instagram
406    },
407    "facebook-crowdtangle": {
408        "name": "Facebook (via CrowdTangle export)",
409        "columns": {"Page Name", "User Name", "Facebook Id", "Page Category", "Page Admin Top Country",
410                    "Page Description", "Page Created", "Likes at Posting", "Followers at Posting", "Post Created",
411                    "Post Created Date", "Post Created Time", "Type", "Total Interactions", "Likes", "Comments",
412                    "Shares", "Love", "Wow", "Haha", "Sad", "Angry", "Care", "Video Share Status",
413                    "Is Video Owner?", "Post Views", "Total Views", "Total Views For All Crossposts",
414                    "Video Length", "URL", "Message", "Link", "Final Link", "Image Text", "Link Text",
415                    "Description", "Sponsor Id", "Sponsor Name", "Sponsor Category"},
416        "mapper": import_crowdtangle_facebook
417    },
418    "facepager": {
419        "name": "Facebook (via Facepager export)",
420        "columns": {"path", "id", "parent_id", "level", "object_id", "object_type", "query_status", "query_time",
421                    "query_type", "from.name", "created_time", "type", "link", "picture", "full_picture", "",
422                    "comments.summary.total_count", "shares.count", "reactions.summary.total_count",
423                    "like.summary.total_count", "love.summary.total_count", "haha.summary.total_count",
424                    "wow.summary.total_count", "sad.summary.total_count", "angry.summary.total_count", "message"},
425        "mapper": import_facepager
426    },
427    "youtube_video_list": {
428        "name": "YouTube videos (via YouTube Data Tools' Video List module)",
429        "columns": {"publishedAt", "videoId", "channelId", "channelTitle", "videoDescription"},
430        "mapper": import_ytdt_videolist,
431        "csv_dialect": {"doublequote": True, "escapechar": "\\"},
432    },
433    "youtube_comment_list": {
434        "name": "YouTube comments (via YouTube Data Tools' Video Info module)",
435        "columns": {"id", "isReplyTo", "authorName", "text", "publishedAt"},
436        "mapper": import_ytdt_commentlist,
437        "csv_dialect": {"doublequote": True, "escapechar": "\\"},
438    },
439    "bazhuayu_weibo": {
440        "name": "Sina Weibo (via Bazhuayu)",
441        "columns": {},
442        "mapper": import_bzy_weibo
443    },
444    "custom": {
445        "name": "Custom/other",
446        "columns": {
447            "id": "A value that uniquely identifies the item, like a numerical ID.",
448            "thread_id": "A value that uniquely identifies the sub-collection an item is a part of, e.g. a forum "
449                         "thread. If this does not apply to your dataset you can use the same value as for 'id' "
450                         "here.",
451            "author": "A value that identifies the author of the item. If the option to pseudonymise data is "
452                      "selected below, this field will be pseudonymised.",
453            "body": "The 'content' of the item, e.g. a post's text.",
454            "timestamp": "The time the item was made or posted. 4CAT will try to interpret this value, but for the "
455                         "best results use YYYY-MM-DD HH:MM:SS notation."
456        },
457        "mapper": map_csv_items,
458        "allow_user_mapping": True
459    }
460}
class InvalidCustomFormat(common.lib.exceptions.ProcessorException):
11class InvalidCustomFormat(ProcessorException):
12    """
13    Raise if processor throws an exception
14    """
15    pass

Raise if processor throws an exception

class InvalidImportedItem:
18class InvalidImportedItem:
19    """
20    Generic data class to pass to have the importer recognise an item as
21    one that should not be written to the result CSV file
22    """
23    reason = ""
24
25    def __init__(self, reason=""):
26        self.reason = reason

Generic data class to pass to have the importer recognise an item as one that should not be written to the result CSV file

InvalidImportedItem(reason='')
25    def __init__(self, reason=""):
26        self.reason = reason
reason = ''
def import_crowdtangle_instagram(reader, columns, dataset, parameters):
29def import_crowdtangle_instagram(reader, columns, dataset, parameters):
30    """
31    Import an export of a CrowdTangle Instagram list
32
33    :param csv.DictReader reader:  Reader object of input file
34    :param Iterable columns:  Required columns
35    :param DataSet dataset:  Dataset to import into
36    :param dict parameters:  Dataset parameters
37    :return tuple:  Items written, items skipped
38    """
39    # write to the result file
40    hashtag = re.compile(r"#([^\s,.+=-]+)")
41    usertag = re.compile(r"@([^\s,.+=-]+)")
42    for item in reader:
43        url = item["URL"]
44        url = re.sub(r"/*$", "", url)
45
46        post_id = url.split("/")[-1]
47        caption = item["Description"]
48        hashtags = hashtag.findall(caption)
49        usertags = usertag.findall(caption)
50
51        datestamp = " ".join(item["Post Created"].split(" ")[:-1])
52        date = datetime.datetime.strptime(datestamp, "%Y-%m-%d %H:%M:%S")
53
54        item = {
55            "id": post_id,
56            "thread_id": post_id,
57            "parent_id": post_id,
58            "body": caption if caption is not None else "",
59            "author": item["User Name"],
60            "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'),
61            "unix_timestamp": int(date.timestamp()),
62            "type": "picture" if item["Type"] == "Photo" else item["Type"].lower(),
63            "url": item["URL"],
64            "thumbnail_url": item["Photo"],
65            "hashtags": ",".join(hashtags),
66            "usertags": ",".join(usertags),
67            "mentioned": "",
68            "num_likes": item["Likes"],
69            "num_comments": item["Comments"],
70            "subject": item["Title"]
71        }
72
73        yield item

Import an export of a CrowdTangle Instagram list

Parameters
  • csv.DictReader reader: Reader object of input file
  • Iterable columns: Required columns
  • DataSet dataset: Dataset to import into
  • dict parameters: Dataset parameters
Returns

Items written, items skipped

def import_crowdtangle_facebook(reader, columns, dataset, parameters):
 76def import_crowdtangle_facebook(reader, columns, dataset, parameters):
 77    """
 78    Import an export of a CrowdTangle Facebook list
 79
 80    :param csv.DictReader reader:  Reader object of input file
 81    :param Iterable columns:  Required columns
 82    :param DataSet dataset:  Dataset to import into
 83    :param dict parameters:  Dataset parameters
 84    :return tuple:  Items written, items skipped
 85    """
 86    # write to the result file
 87    hashtag = re.compile(r"#([^\s,.+=-]+)")
 88    entity_name = "Page Name" if "Page Name" in reader.fieldnames else "Group Name"
 89    overperforming_column = None
 90    for item in reader:
 91        hashtags = hashtag.findall(item["Message"])
 92        try:
 93            date = datetime.datetime.strptime(" ".join(item["Post Created"].split(" ")[:2]), "%Y-%m-%d %H:%M:%S")
 94        except ValueError:
 95            yield InvalidImportedItem(reason=f"Cannot parse date/time '{item['Post Created']}'; skipping post")
 96
 97        is_from_elsewhere = item["Link"].find("https://www.facebook.com/" + item["User Name"]) < 0
 98        shared_page = item["Link"].split("/")[3] if is_from_elsewhere and item["Link"].find(
 99            "https://www.facebook.com/") == 0 else ""
100
101        # this one is a handful
102        # unicode in csv column names is no fun
103        if not overperforming_column:
104            overperforming_column = [c for c in item.keys() if "Overperforming" in c][0]
105
106        overperforming = item.get(overperforming_column, "")
107
108        item = {
109            "id": item["URL"].split("/")[-1],
110            "thread_id": item["URL"].split("/")[-1],
111            "body": item["Message"],
112            "author": item["User Name"],
113            "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'),
114            "unix_timestamp": int(date.timestamp()),
115            "page_name": item[entity_name],
116            "page_category": item["Page Category"],
117            "page_top_country": item["Page Admin Top Country"],
118            "page_description": item["Page Description"],
119            "page_created": item["Page Created"],
120            "page_likes": item["Likes at Posting"],
121            "page_id": item["Facebook Id"],
122            "page_followers": item["Followers at Posting"],
123            "page_shared_from": shared_page,
124            "type": item["Type"],
125            "interactions": convert_to_int(re.sub(r"[^0-9]", "", item["Total Interactions"]), 0) if item[
126                "Total Interactions"] else 0,
127            "comments": item["Comments"],
128            "shares": item["Shares"],
129            "likes": item["Likes"],
130            "likes_love": item["Love"],
131            "likes_wow": item["Wow"],
132            "likes_haha": item["Haha"],
133            "likes_sad": item["Sad"],
134            "likes_angry": item["Angry"],
135            "likes_care": item["Care"],
136            "views_post": item["Post Views"],
137            "views_total": item["Total Views"],
138            "views_total_crossposts": item["Total Views For All Crossposts"],
139            "overperforming_score": overperforming,
140            "video_length": "" if item["Video Length"] == "N/A" else item["Video Length"],
141            "video_status": item["Video Share Status"],
142            "video_own": "yes" if item["Is Video Owner?"] == "Yes" else "no",
143            "url": item["URL"],
144            "hashtags": ",".join(hashtags),
145            "url_original": item["Final Link"] if item["Final Link"] else item["Link"],
146            "body_image": item["Image Text"],
147            "body_link": item["Link Text"],
148            "body_description": item["Description"],
149            "sponsor_id": item["Sponsor Id"],
150            "sponsor_name": item["Sponsor Name"],
151            "sponsor_category": item["Sponsor Category"]
152        }
153
154        yield item

Import an export of a CrowdTangle Facebook list

Parameters
  • csv.DictReader reader: Reader object of input file
  • Iterable columns: Required columns
  • DataSet dataset: Dataset to import into
  • dict parameters: Dataset parameters
Returns

Items written, items skipped

def import_facepager(reader, columns, dataset, parameters):
157def import_facepager(reader, columns, dataset, parameters):
158    """
159    Import an export of a Facepager export
160
161    :param csv.DictReader reader:  Reader object of input file
162    :param Iterable columns:  Required columns
163    :param DataSet dataset:  Dataset to import into
164    :param dict parameters:  Dataset parameters
165    :return tuple:  Items written, items skipped
166    """
167    # write to the result file
168    for item in reader:
169        hashtags = json.loads(item["hashtags"])
170        hashtags = [hashtag["name"] for hashtag in hashtags]
171
172        item = {
173            "id": item["id"],
174            "thread_id": item["id"],
175            "author": item["authorMeta.name"],
176            "body": item["text"],
177            "timestamp": datetime.datetime.utcfromtimestamp(int(item["createTime"])).strftime(
178                '%Y-%m-%d %H:%M:%S'),
179            "unix_timestamp": int(item["createTime"]),
180            "is_harmful": -1,
181            "is_duet": -1,
182            "music_name": item["musicMeta.musicName"],
183            "music_id": item["musicMeta.musicId"],
184            "music_author": item["musicMeta.musicAuthor"],
185            "video_url": item["videoUrl"],
186            "tiktok_url": "https://tiktok.com/@%s/video/%s" % (item["authorMeta.id"], item["id"]),
187            "thumbnail_url": item["covers.default"],
188            "amount_likes": item["diggCount"],
189            "amount_comments": item["commentCount"],
190            "amount_shares": item["shareCount"],
191            "amount_plays": item["playCount"],
192            "hashtags": ",".join(hashtags)
193        }
194
195        yield item

Import an export of a Facepager export

Parameters
  • csv.DictReader reader: Reader object of input file
  • Iterable columns: Required columns
  • DataSet dataset: Dataset to import into
  • dict parameters: Dataset parameters
Returns

Items written, items skipped

def import_ytdt_videolist(reader, columns, dataset, parameters):
198def import_ytdt_videolist(reader, columns, dataset, parameters):
199    """
200    Import an export of a YouTube Data Tools Video List export
201
202    :param csv.DictReader reader:  Reader object of input file
203    :param Iterable columns:  Required columns
204    :param DataSet dataset:  Dataset to import into
205    :param dict parameters:  Dataset parameters
206    :return tuple:  Items written, items skipped
207    """
208    # write to the result file
209    for item in reader:
210        try:
211            date = datetime.datetime.strptime(item["publishedAt"], "%Y-%m-%dT%H:%M:%SZ")  # ex. 2022-11-11T05:30:01Z
212        except ValueError:
213            yield InvalidImportedItem(reason=f"Invalid date ({item['publishedAt']})")
214            continue
215
216        collection_date = "_".join(dataset.parameters.get("filename").split("_")[2:]).replace(".csv", "")
217
218        item = {
219            "id": item.get('videoId'),
220            "thread_id": item.get('channelId'),
221            "author": item.get('channelTitle'),
222            "body": item.get('videoDescription'),
223            "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'),
224            "unix_timestamp": int(date.timestamp()),
225            **item,
226            "source_filename": dataset.parameters.get("filename"),
227            "date_collected": collection_date,
228            "youtube_url": f"https://www.youtube.com/watch?v={item['videoId']}"
229        }
230
231        yield item

Import an export of a YouTube Data Tools Video List export

Parameters
  • csv.DictReader reader: Reader object of input file
  • Iterable columns: Required columns
  • DataSet dataset: Dataset to import into
  • dict parameters: Dataset parameters
Returns

Items written, items skipped

def import_ytdt_commentlist(reader, columns, dataset, parameters):
234def import_ytdt_commentlist(reader, columns, dataset, parameters):
235    """
236    Import an export of a YouTube Data Tools Video Info export
237
238    :param csv.DictReader reader:  Reader object of input file
239    :param Iterable columns:  Required columns
240    :param DataSet dataset:  Dataset to import into
241    :param dict parameters:  Dataset parameters
242    :return tuple:  Items written, items skipped
243    """
244    # write to the result file
245    for item in reader:
246        try:
247            date = datetime.datetime.strptime(item["publishedAt"], "%Y-%m-%d %H:%M:%S")  # ex. 2022-11-11 05:30:01
248        except ValueError:
249            yield InvalidImportedItem(reason=f"Invalid date ({item['publishedAt']})")
250            continue
251
252        collection_date = "_".join(dataset.parameters.get("filename").split("_")[2:]).replace(".csv", "")
253
254        item = {
255            "id": item["id"],
256            "thread_id": item["isReplyTo"] if item["isReplyTo"] else item["id"],
257            "author": item["authorName"],
258            "body": item["text"],
259            "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'),
260            "unix_timestamp": int(date.timestamp()),
261            **item,
262            "source_filename": dataset.parameters.get("filename"),
263            "date_collected": collection_date,
264        }
265
266        yield item

Import an export of a YouTube Data Tools Video Info export

Parameters
  • csv.DictReader reader: Reader object of input file
  • Iterable columns: Required columns
  • DataSet dataset: Dataset to import into
  • dict parameters: Dataset parameters
Returns

Items written, items skipped

def import_bzy_weibo(reader, columns, dataset, parameter):
269def import_bzy_weibo(reader, columns, dataset, parameter):
270    """
271    Import Weibo item collected by Bazhuayu
272
273    :param csv.DictReader reader:  Reader object of input file
274    :param Iterable columns:  Required columns
275    :param DataSet dataset:  Dataset to import into
276    :param dict parameters:  Dataset parameters
277    :return tuple:  Items written, items skipped
278    """
279    index = 1
280    year = datetime.datetime.now().year
281
282    for item in reader:
283        if "from1" not in item:
284            raise InvalidCustomFormat("CSV does not appear to be Bazhuayu format for Sina Weibo; please try importing again with CSV format set to \"Custom/other\".")
285        raw_timestamp = item["from1"].strip()
286        timestamp_bits = re.split(r"[年月日\s:]+", raw_timestamp)
287
288        if re.match(r"[0-9]{2}月[0-9]{2}日 [0-9]{2}:[0-9]{2}", raw_timestamp):
289            timestamp = datetime.datetime(year, int(timestamp_bits[0]), int(timestamp_bits[1]), int(timestamp_bits[2]),
290                                          int(timestamp_bits[3]))
291        elif re.match(r"[0-9]{4}[0-9]{2}月[0-9]{2}日 [0-9]{2}:[0-9]{2}", raw_timestamp):
292
293            timestamp = datetime.datetime(int(timestamp_bits[0]), int(timestamp_bits[1]), int(timestamp_bits[2]),
294                                          int(timestamp_bits[3]), int(timestamp_bits[4]))
295        else:
296            yield InvalidImportedItem(f"Cannot parse timestamp {raw_timestamp}")
297
298        item = {
299            "id": index,
300            "thread_id": index,
301            "author": item["标题"],
302            "body": item["txt"],
303            "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"),
304            "image_url": item["图片"],
305            **item,
306            "unix_timestamp": int(timestamp.timestamp())
307        }
308
309        index += 1
310        yield item

Import Weibo item collected by Bazhuayu

Parameters
  • csv.DictReader reader: Reader object of input file
  • Iterable columns: Required columns
  • DataSet dataset: Dataset to import into
  • dict parameters: Dataset parameters
Returns

Items written, items skipped

def map_csv_items(reader, columns, dataset, parameters):
313def map_csv_items(reader, columns, dataset, parameters):
314    """
315    Read CSV items and put them in the 4CAT dataset file
316
317    This version of the method mostly just copies the file, applying the
318    supplied mapping where possible. It could alternatively apply more
319    fancy mappings.
320
321    :param csv.DictReader reader:  Reader object of input file
322    :param Iterable columns:  Required columns
323    :param DataSet dataset:  Dataset to import into
324    :param dict parameters:  Dataset parameters
325    :return tuple:  Items written, items skipped
326    """
327    # write to the result file
328    indexes = {}
329    now_timestmap = str(int(datetime.datetime.now().timestamp()))
330    for row in reader:
331        mapped_row = {}
332        for field in columns:
333            mapping = parameters.get("mapping-" + field)
334            if mapping:
335                if mapping == "__4cat_auto_sequence":
336                    # auto-numbering
337                    if field not in indexes:
338                        indexes[field] = 1
339                    mapped_row[field] = indexes[field]
340                    indexes[field] += 1
341                elif mapping == "__4cat_empty_value":
342                    mapped_row[field] = ""
343                elif mapping == "__4cat_now":
344                    mapped_row[field] = now_timestmap
345                else:
346                    # actual mapping
347                    mapped_row[field] = row[mapping]
348
349        # ensure that timestamp is YYYY-MM-DD HH:MM:SS and that there
350        # is a unix timestamp. this will override the columns if they
351        # already exist! but it is necessary for 4CAT to handle the
352        # data in processors etc and should be an equivalent value.
353        if not mapped_row.get("timestamp"):
354            if mapped_row.get("unix_timestamp"):
355                # if unix timestamp is given, convert to datetime
356                try:
357                    timestamp = datetime.datetime.fromtimestamp(int(mapped_row["unix_timestamp"]))
358                    mapped_row["timestamp"] = timestamp.strftime("%Y-%m-%d %H:%M:%S")
359                except (ValueError, OSError) as e:
360                    yield InvalidImportedItem(f"{e.__class__.__name__} - {e} (value was '{mapped_row['unix_timestamp']}')")
361                    continue
362
363            # no timestamp given, set to empty string
364            mapped_row["timestamp"] = ""
365            mapped_row["unix_timestamp"] = None
366            
367        else:
368            try:
369                
370                if mapped_row["timestamp"].replace(".", "").isdecimal() and mapped_row["timestamp"].count(".") <= 1:  # ignore . for floats
371                    timestamp = datetime.datetime.fromtimestamp(float(mapped_row["timestamp"]))
372                else:
373                    timestamp = parse_datetime(mapped_row["timestamp"])
374
375                mapped_row["timestamp"] = timestamp.strftime("%Y-%m-%d %H:%M:%S")
376                mapped_row["unix_timestamp"] = int(timestamp.timestamp())                
377
378            except (ValueError, OSError, AttributeError) as e:
379                # skip rows without a valid timestamp - this may happen
380                # despite validation because only a sample is validated
381                # this is an OSError on Windows sometimes???
382                yield InvalidImportedItem(f"{e.__class__.__name__} - {e} (value was '{mapped_row['timestamp']}')")
383                continue
384        
385        # this ensures that the required columns are always the first
386        # columns, and the rest is in original order
387        for field, value in row.items():
388            if field not in mapped_row and field:
389                mapped_row[field] = value
390
391        yield mapped_row

Read CSV items and put them in the 4CAT dataset file

This version of the method mostly just copies the file, applying the supplied mapping where possible. It could alternatively apply more fancy mappings.

Parameters
  • csv.DictReader reader: Reader object of input file
  • Iterable columns: Required columns
  • DataSet dataset: Dataset to import into
  • dict parameters: Dataset parameters
Returns

Items written, items skipped

tools = {'instagram-crowdtangle': {'name': 'Instagram (via CrowdTangle export)', 'columns': {'Account', 'Post Created', 'Followers at Posting', 'Likes', 'Views', 'URL', 'Description', 'Photo', 'Comments', 'Link', 'User Name', 'Type', 'Title'}, 'mapper': <function import_crowdtangle_instagram>}, 'facebook-crowdtangle': {'name': 'Facebook (via CrowdTangle export)', 'columns': {'Total Views', 'Love', 'Care', 'Page Name', 'Likes at Posting', 'User Name', 'Is Video Owner?', 'Total Interactions', 'Post Created', 'Post Created Date', 'Message', 'Post Created Time', 'Description', 'Final Link', 'Page Created', 'Video Length', 'Total Views For All Crossposts', 'Type', 'Sponsor Category', 'Followers at Posting', 'Video Share Status', 'Page Category', 'URL', 'Facebook Id', 'Link', 'Sad', 'Link Text', 'Sponsor Id', 'Wow', 'Sponsor Name', 'Post Views', 'Shares', 'Image Text', 'Likes', 'Haha', 'Page Admin Top Country', 'Comments', 'Angry', 'Page Description'}, 'mapper': <function import_crowdtangle_facebook>}, 'facepager': {'name': 'Facebook (via Facepager export)', 'columns': {'', 'angry.summary.total_count', 'shares.count', 'message', 'path', 'full_picture', 'object_id', 'query_type', 'comments.summary.total_count', 'link', 'parent_id', 'type', 'picture', 'created_time', 'sad.summary.total_count', 'object_type', 'id', 'haha.summary.total_count', 'reactions.summary.total_count', 'like.summary.total_count', 'love.summary.total_count', 'wow.summary.total_count', 'from.name', 'query_status', 'query_time', 'level'}, 'mapper': <function import_facepager>}, 'youtube_video_list': {'name': "YouTube videos (via YouTube Data Tools' Video List module)", 'columns': {'channelId', 'channelTitle', 'videoDescription', 'videoId', 'publishedAt'}, 'mapper': <function import_ytdt_videolist>, 'csv_dialect': {'doublequote': True, 'escapechar': '\\'}}, 'youtube_comment_list': {'name': "YouTube comments (via YouTube Data Tools' Video Info module)", 'columns': {'isReplyTo', 'text', 'id', 'authorName', 'publishedAt'}, 'mapper': <function import_ytdt_commentlist>, 'csv_dialect': {'doublequote': True, 'escapechar': '\\'}}, 'bazhuayu_weibo': {'name': 'Sina Weibo (via Bazhuayu)', 'columns': {}, 'mapper': <function import_bzy_weibo>}, 'custom': {'name': 'Custom/other', 'columns': {'id': 'A value that uniquely identifies the item, like a numerical ID.', 'thread_id': "A value that uniquely identifies the sub-collection an item is a part of, e.g. a forum thread. If this does not apply to your dataset you can use the same value as for 'id' here.", 'author': 'A value that identifies the author of the item. If the option to pseudonymise data is selected below, this field will be pseudonymised.', 'body': "The 'content' of the item, e.g. a post's text.", 'timestamp': 'The time the item was made or posted. 4CAT will try to interpret this value, but for the best results use YYYY-MM-DD HH:MM:SS notation.'}, 'mapper': <function map_csv_items>, 'allow_user_mapping': True}}