datasources.upload.import_formats
1import datetime 2import json 3import re 4 5from dateutil.parser import parse as parse_datetime 6from common.lib.exceptions import ProcessorException 7from common.lib.helpers import convert_to_int 8 9 10class InvalidCustomFormat(ProcessorException): 11 """ 12 Raise if processor throws an exception 13 """ 14 pass 15 16 17class InvalidImportedItem: 18 """ 19 Generic data class to pass to have the importer recognise an item as 20 one that should not be written to the result CSV file 21 """ 22 reason = "" 23 24 def __init__(self, reason=""): 25 self.reason = reason 26 27 28def import_crowdtangle_instagram(reader, columns, dataset, parameters): 29 """ 30 Import an export of a CrowdTangle Instagram list 31 32 :param csv.DictReader reader: Reader object of input file 33 :param Iterable columns: Required columns 34 :param DataSet dataset: Dataset to import into 35 :param dict parameters: Dataset parameters 36 :return tuple: Items written, items skipped 37 """ 38 # write to the result file 39 hashtag = re.compile(r"#([^\s,.+=-]+)") 40 usertag = re.compile(r"@([^\s,.+=-]+)") 41 for item in reader: 42 url = item["URL"] 43 url = re.sub(r"/*$", "", url) 44 45 post_id = url.split("/")[-1] 46 caption = item["Description"] 47 hashtags = hashtag.findall(caption) 48 usertags = usertag.findall(caption) 49 50 datestamp = " ".join(item["Post Created"].split(" ")[:-1]) 51 date = datetime.datetime.strptime(datestamp, "%Y-%m-%d %H:%M:%S") 52 53 item = { 54 "id": post_id, 55 "thread_id": post_id, 56 "parent_id": post_id, 57 "body": caption if caption is not None else "", 58 "author": item["User Name"], 59 "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'), 60 "unix_timestamp": int(date.timestamp()), 61 "type": "picture" if item["Type"] == "Photo" else item["Type"].lower(), 62 "url": item["URL"], 63 "thumbnail_url": item["Photo"], 64 "hashtags": ",".join(hashtags), 65 "usertags": ",".join(usertags), 66 "mentioned": "", 67 "num_likes": item["Likes"], 68 "num_comments": item["Comments"], 69 "subject": item["Title"] 70 } 71 72 yield item 73 74 75def import_crowdtangle_facebook(reader, columns, dataset, parameters): 76 """ 77 Import an export of a CrowdTangle Facebook list 78 79 :param csv.DictReader reader: Reader object of input file 80 :param Iterable columns: Required columns 81 :param DataSet dataset: Dataset to import into 82 :param dict parameters: Dataset parameters 83 :return tuple: Items written, items skipped 84 """ 85 # write to the result file 86 hashtag = re.compile(r"#([^\s,.+=-]+)") 87 entity_name = "Page Name" if "Page Name" in reader.fieldnames else "Group Name" 88 overperforming_column = None 89 for item in reader: 90 hashtags = hashtag.findall(item["Message"]) 91 try: 92 date = datetime.datetime.strptime(" ".join(item["Post Created"].split(" ")[:2]), "%Y-%m-%d %H:%M:%S") 93 except ValueError: 94 yield InvalidImportedItem(reason=f"Cannot parse date/time '{item['Post Created']}'; skipping post") 95 96 is_from_elsewhere = item["Link"].find("https://www.facebook.com/" + item["User Name"]) < 0 97 shared_page = item["Link"].split("/")[3] if is_from_elsewhere and item["Link"].find( 98 "https://www.facebook.com/") == 0 else "" 99 100 # this one is a handful 101 # unicode in csv column names is no fun 102 if not overperforming_column: 103 overperforming_column = [c for c in item.keys() if "Overperforming" in c][0] 104 105 overperforming = item.get(overperforming_column, "") 106 107 item = { 108 "id": item["URL"].split("/")[-1], 109 "thread_id": item["URL"].split("/")[-1], 110 "body": item["Message"], 111 "author": item["User Name"], 112 "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'), 113 "unix_timestamp": int(date.timestamp()), 114 "page_name": item[entity_name], 115 "page_category": item["Page Category"], 116 "page_top_country": item["Page Admin Top Country"], 117 "page_description": item["Page Description"], 118 "page_created": item["Page Created"], 119 "page_likes": item["Likes at Posting"], 120 "page_id": item["Facebook Id"], 121 "page_followers": item["Followers at Posting"], 122 "page_shared_from": shared_page, 123 "type": item["Type"], 124 "interactions": convert_to_int(re.sub(r"[^0-9]", "", item["Total Interactions"]), 0) if item[ 125 "Total Interactions"] else 0, 126 "comments": item["Comments"], 127 "shares": item["Shares"], 128 "likes": item["Likes"], 129 "likes_love": item["Love"], 130 "likes_wow": item["Wow"], 131 "likes_haha": item["Haha"], 132 "likes_sad": item["Sad"], 133 "likes_angry": item["Angry"], 134 "likes_care": item["Care"], 135 "views_post": item["Post Views"], 136 "views_total": item["Total Views"], 137 "views_total_crossposts": item["Total Views For All Crossposts"], 138 "overperforming_score": overperforming, 139 "video_length": "" if item["Video Length"] == "N/A" else item["Video Length"], 140 "video_status": item["Video Share Status"], 141 "video_own": "yes" if item["Is Video Owner?"] == "Yes" else "no", 142 "url": item["URL"], 143 "hashtags": ",".join(hashtags), 144 "url_original": item["Final Link"] if item["Final Link"] else item["Link"], 145 "body_image": item["Image Text"], 146 "body_link": item["Link Text"], 147 "body_description": item["Description"], 148 "sponsor_id": item["Sponsor Id"], 149 "sponsor_name": item["Sponsor Name"], 150 "sponsor_category": item["Sponsor Category"] 151 } 152 153 yield item 154 155 156def import_facepager(reader, columns, dataset, parameters): 157 """ 158 Import an export of a Facepager export 159 160 :param csv.DictReader reader: Reader object of input file 161 :param Iterable columns: Required columns 162 :param DataSet dataset: Dataset to import into 163 :param dict parameters: Dataset parameters 164 :return tuple: Items written, items skipped 165 """ 166 # write to the result file 167 for item in reader: 168 hashtags = json.loads(item["hashtags"]) 169 hashtags = [hashtag["name"] for hashtag in hashtags] 170 171 item = { 172 "id": item["id"], 173 "thread_id": item["id"], 174 "author": item["authorMeta.name"], 175 "body": item["text"], 176 "timestamp": datetime.datetime.utcfromtimestamp(int(item["createTime"])).strftime( 177 '%Y-%m-%d %H:%M:%S'), 178 "unix_timestamp": int(item["createTime"]), 179 "is_harmful": -1, 180 "is_duet": -1, 181 "music_name": item["musicMeta.musicName"], 182 "music_id": item["musicMeta.musicId"], 183 "music_author": item["musicMeta.musicAuthor"], 184 "video_url": item["videoUrl"], 185 "tiktok_url": "https://tiktok.com/@%s/video/%s" % (item["authorMeta.id"], item["id"]), 186 "thumbnail_url": item["covers.default"], 187 "amount_likes": item["diggCount"], 188 "amount_comments": item["commentCount"], 189 "amount_shares": item["shareCount"], 190 "amount_plays": item["playCount"], 191 "hashtags": ",".join(hashtags) 192 } 193 194 yield item 195 196 197def import_ytdt_videolist(reader, columns, dataset, parameters): 198 """ 199 Import an export of a YouTube Data Tools Video List export 200 201 :param csv.DictReader reader: Reader object of input file 202 :param Iterable columns: Required columns 203 :param DataSet dataset: Dataset to import into 204 :param dict parameters: Dataset parameters 205 :return tuple: Items written, items skipped 206 """ 207 # write to the result file 208 for item in reader: 209 try: 210 date = datetime.datetime.strptime(item["publishedAt"], "%Y-%m-%dT%H:%M:%SZ") # ex. 2022-11-11T05:30:01Z 211 except ValueError: 212 yield InvalidImportedItem(reason=f"Invalid date ({item['publishedAt']})") 213 continue 214 215 collection_date = "_".join(dataset.parameters.get("filename").split("_")[2:]).replace(".csv", "") 216 217 item = { 218 "id": item.get('videoId'), 219 "thread_id": item.get('channelId'), 220 "author": item.get('channelTitle'), 221 "body": item.get('videoDescription'), 222 "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'), 223 "unix_timestamp": int(date.timestamp()), 224 **item, 225 "source_filename": dataset.parameters.get("filename"), 226 "date_collected": collection_date, 227 "youtube_url": f"https://www.youtube.com/watch?v={item['videoId']}" 228 } 229 230 yield item 231 232 233def import_ytdt_commentlist(reader, columns, dataset, parameters): 234 """ 235 Import an export of a YouTube Data Tools Video Info export 236 237 :param csv.DictReader reader: Reader object of input file 238 :param Iterable columns: Required columns 239 :param DataSet dataset: Dataset to import into 240 :param dict parameters: Dataset parameters 241 :return tuple: Items written, items skipped 242 """ 243 # write to the result file 244 for item in reader: 245 try: 246 date = datetime.datetime.strptime(item["publishedAt"], "%Y-%m-%d %H:%M:%S") # ex. 2022-11-11 05:30:01 247 except ValueError: 248 yield InvalidImportedItem(reason=f"Invalid date ({item['publishedAt']})") 249 continue 250 251 collection_date = "_".join(dataset.parameters.get("filename").split("_")[2:]).replace(".csv", "") 252 253 item = { 254 "id": item["id"], 255 "thread_id": item["isReplyTo"] if item["isReplyTo"] else item["id"], 256 "author": item["authorName"], 257 "body": item["text"], 258 "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'), 259 "unix_timestamp": int(date.timestamp()), 260 **item, 261 "source_filename": dataset.parameters.get("filename"), 262 "date_collected": collection_date, 263 } 264 265 yield item 266 267 268def import_bzy_weibo(reader, columns, dataset, parameter): 269 """ 270 Import Weibo item collected by Bazhuayu 271 272 :param csv.DictReader reader: Reader object of input file 273 :param Iterable columns: Required columns 274 :param DataSet dataset: Dataset to import into 275 :param dict parameters: Dataset parameters 276 :return tuple: Items written, items skipped 277 """ 278 index = 1 279 year = datetime.datetime.now().year 280 281 for item in reader: 282 if "from1" not in item: 283 raise InvalidCustomFormat("CSV does not appear to be Bazhuayu format for Sina Weibo; please try importing again with CSV format set to \"Custom/other\".") 284 raw_timestamp = item["from1"].strip() 285 timestamp_bits = re.split(r"[年月日\s:]+", raw_timestamp) 286 287 if re.match(r"[0-9]{2}月[0-9]{2}日 [0-9]{2}:[0-9]{2}", raw_timestamp): 288 timestamp = datetime.datetime(year, int(timestamp_bits[0]), int(timestamp_bits[1]), int(timestamp_bits[2]), 289 int(timestamp_bits[3])) 290 elif re.match(r"[0-9]{4}[0-9]{2}月[0-9]{2}日 [0-9]{2}:[0-9]{2}", raw_timestamp): 291 292 timestamp = datetime.datetime(int(timestamp_bits[0]), int(timestamp_bits[1]), int(timestamp_bits[2]), 293 int(timestamp_bits[3]), int(timestamp_bits[4])) 294 else: 295 yield InvalidImportedItem(f"Cannot parse timestamp {raw_timestamp}") 296 297 item = { 298 "id": index, 299 "thread_id": index, 300 "author": item["标题"], 301 "body": item["txt"], 302 "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"), 303 "image_url": item["图片"], 304 **item, 305 "unix_timestamp": int(timestamp.timestamp()) 306 } 307 308 index += 1 309 yield item 310 311 312def map_csv_items(reader, columns, dataset, parameters): 313 """ 314 Read CSV items and put them in the 4CAT dataset file 315 316 This version of the method mostly just copies the file, applying the 317 supplied mapping where possible. It could alternatively apply more 318 fancy mappings. 319 320 :param csv.DictReader reader: Reader object of input file 321 :param Iterable columns: Required columns 322 :param DataSet dataset: Dataset to import into 323 :param dict parameters: Dataset parameters 324 :return tuple: Items written, items skipped 325 """ 326 # write to the result file 327 indexes = {} 328 now_timestmap = str(int(datetime.datetime.now().timestamp())) 329 for row in reader: 330 mapped_row = {} 331 for field in columns: 332 mapping = parameters.get("mapping-" + field) 333 if mapping: 334 if mapping == "__4cat_auto_sequence": 335 # auto-numbering 336 if field not in indexes: 337 indexes[field] = 1 338 mapped_row[field] = indexes[field] 339 indexes[field] += 1 340 elif mapping == "__4cat_empty_value": 341 mapped_row[field] = "" 342 elif mapping == "__4cat_now": 343 mapped_row[field] = now_timestmap 344 else: 345 # actual mapping 346 mapped_row[field] = row[mapping] 347 348 # ensure that timestamp is YYYY-MM-DD HH:MM:SS and that there 349 # is a unix timestamp. this will override the columns if they 350 # already exist! but it is necessary for 4CAT to handle the 351 # data in processors etc and should be an equivalent value. 352 if not mapped_row.get("timestamp"): 353 if mapped_row.get("unix_timestamp"): 354 # if unix timestamp is given, convert to datetime 355 try: 356 timestamp = datetime.datetime.fromtimestamp(int(mapped_row["unix_timestamp"])) 357 mapped_row["timestamp"] = timestamp.strftime("%Y-%m-%d %H:%M:%S") 358 except (ValueError, OSError) as e: 359 yield InvalidImportedItem(f"{e.__class__.__name__} - {e} (value was '{mapped_row['unix_timestamp']}')") 360 continue 361 362 # no timestamp given, set to empty string 363 mapped_row["timestamp"] = "" 364 mapped_row["unix_timestamp"] = None 365 366 else: 367 try: 368 369 if mapped_row["timestamp"].replace(".", "").isdecimal() and mapped_row["timestamp"].count(".") <= 1: # ignore . for floats 370 timestamp = datetime.datetime.fromtimestamp(float(mapped_row["timestamp"])) 371 else: 372 timestamp = parse_datetime(mapped_row["timestamp"]) 373 374 mapped_row["timestamp"] = timestamp.strftime("%Y-%m-%d %H:%M:%S") 375 mapped_row["unix_timestamp"] = int(timestamp.timestamp()) 376 377 except (ValueError, OSError, AttributeError) as e: 378 # skip rows without a valid timestamp - this may happen 379 # despite validation because only a sample is validated 380 # this is an OSError on Windows sometimes??? 381 yield InvalidImportedItem(f"{e.__class__.__name__} - {e} (value was '{mapped_row['timestamp']}')") 382 continue 383 384 # this ensures that the required columns are always the first 385 # columns, and the rest is in original order 386 for field, value in row.items(): 387 if field not in mapped_row and field: 388 mapped_row[field] = value 389 390 yield mapped_row 391 392 393# tools that are supported for importing 394# defined here (instead of at the top) so we can refer to the functions 395# defined above 396# format: dictionary with keys name, columns, mapper 397# name is a human-readable name for this format (e.g. a tool name) 398# columns is a set of required columns in the uploaded csv 399# mapper is a function that writes the 4CAT-compatible CSV 400tools = { 401 "instagram-crowdtangle": { 402 "name": "Instagram (via CrowdTangle export)", 403 "columns": {"Account", "User Name", "Followers at Posting", "Post Created", "Type", "Likes", "Comments", 404 "Views", "URL", "Link", "Photo", "Title", "Description"}, 405 "mapper": import_crowdtangle_instagram 406 }, 407 "facebook-crowdtangle": { 408 "name": "Facebook (via CrowdTangle export)", 409 "columns": {"Page Name", "User Name", "Facebook Id", "Page Category", "Page Admin Top Country", 410 "Page Description", "Page Created", "Likes at Posting", "Followers at Posting", "Post Created", 411 "Post Created Date", "Post Created Time", "Type", "Total Interactions", "Likes", "Comments", 412 "Shares", "Love", "Wow", "Haha", "Sad", "Angry", "Care", "Video Share Status", 413 "Is Video Owner?", "Post Views", "Total Views", "Total Views For All Crossposts", 414 "Video Length", "URL", "Message", "Link", "Final Link", "Image Text", "Link Text", 415 "Description", "Sponsor Id", "Sponsor Name", "Sponsor Category"}, 416 "mapper": import_crowdtangle_facebook 417 }, 418 "facepager": { 419 "name": "Facebook (via Facepager export)", 420 "columns": {"path", "id", "parent_id", "level", "object_id", "object_type", "query_status", "query_time", 421 "query_type", "from.name", "created_time", "type", "link", "picture", "full_picture", "", 422 "comments.summary.total_count", "shares.count", "reactions.summary.total_count", 423 "like.summary.total_count", "love.summary.total_count", "haha.summary.total_count", 424 "wow.summary.total_count", "sad.summary.total_count", "angry.summary.total_count", "message"}, 425 "mapper": import_facepager 426 }, 427 "youtube_video_list": { 428 "name": "YouTube videos (via YouTube Data Tools' Video List module)", 429 "columns": {"publishedAt", "videoId", "channelId", "channelTitle", "videoDescription"}, 430 "mapper": import_ytdt_videolist, 431 "csv_dialect": {"doublequote": True, "escapechar": "\\"}, 432 }, 433 "youtube_comment_list": { 434 "name": "YouTube comments (via YouTube Data Tools' Video Info module)", 435 "columns": {"id", "isReplyTo", "authorName", "text", "publishedAt"}, 436 "mapper": import_ytdt_commentlist, 437 "csv_dialect": {"doublequote": True, "escapechar": "\\"}, 438 }, 439 "bazhuayu_weibo": { 440 "name": "Sina Weibo (via Bazhuayu)", 441 "columns": {}, 442 "mapper": import_bzy_weibo 443 }, 444 "custom": { 445 "name": "Custom/other", 446 "columns": { 447 "id": "A value that uniquely identifies the item, like a numerical ID.", 448 "thread_id": "A value that uniquely identifies the sub-collection an item is a part of, e.g. a forum " 449 "thread. If this does not apply to your dataset you can use the same value as for 'id' " 450 "here.", 451 "author": "A value that identifies the author of the item. If the option to pseudonymise data is " 452 "selected below, this field will be pseudonymised.", 453 "body": "The 'content' of the item, e.g. a post's text.", 454 "timestamp": "The time the item was made or posted. 4CAT will try to interpret this value, but for the " 455 "best results use YYYY-MM-DD HH:MM:SS notation." 456 }, 457 "mapper": map_csv_items, 458 "allow_user_mapping": True 459 } 460}
11class InvalidCustomFormat(ProcessorException): 12 """ 13 Raise if processor throws an exception 14 """ 15 pass
Raise if processor throws an exception
Inherited Members
18class InvalidImportedItem: 19 """ 20 Generic data class to pass to have the importer recognise an item as 21 one that should not be written to the result CSV file 22 """ 23 reason = "" 24 25 def __init__(self, reason=""): 26 self.reason = reason
Generic data class to pass to have the importer recognise an item as one that should not be written to the result CSV file
29def import_crowdtangle_instagram(reader, columns, dataset, parameters): 30 """ 31 Import an export of a CrowdTangle Instagram list 32 33 :param csv.DictReader reader: Reader object of input file 34 :param Iterable columns: Required columns 35 :param DataSet dataset: Dataset to import into 36 :param dict parameters: Dataset parameters 37 :return tuple: Items written, items skipped 38 """ 39 # write to the result file 40 hashtag = re.compile(r"#([^\s,.+=-]+)") 41 usertag = re.compile(r"@([^\s,.+=-]+)") 42 for item in reader: 43 url = item["URL"] 44 url = re.sub(r"/*$", "", url) 45 46 post_id = url.split("/")[-1] 47 caption = item["Description"] 48 hashtags = hashtag.findall(caption) 49 usertags = usertag.findall(caption) 50 51 datestamp = " ".join(item["Post Created"].split(" ")[:-1]) 52 date = datetime.datetime.strptime(datestamp, "%Y-%m-%d %H:%M:%S") 53 54 item = { 55 "id": post_id, 56 "thread_id": post_id, 57 "parent_id": post_id, 58 "body": caption if caption is not None else "", 59 "author": item["User Name"], 60 "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'), 61 "unix_timestamp": int(date.timestamp()), 62 "type": "picture" if item["Type"] == "Photo" else item["Type"].lower(), 63 "url": item["URL"], 64 "thumbnail_url": item["Photo"], 65 "hashtags": ",".join(hashtags), 66 "usertags": ",".join(usertags), 67 "mentioned": "", 68 "num_likes": item["Likes"], 69 "num_comments": item["Comments"], 70 "subject": item["Title"] 71 } 72 73 yield item
Import an export of a CrowdTangle Instagram list
Parameters
- csv.DictReader reader: Reader object of input file
- Iterable columns: Required columns
- DataSet dataset: Dataset to import into
- dict parameters: Dataset parameters
Returns
Items written, items skipped
76def import_crowdtangle_facebook(reader, columns, dataset, parameters): 77 """ 78 Import an export of a CrowdTangle Facebook list 79 80 :param csv.DictReader reader: Reader object of input file 81 :param Iterable columns: Required columns 82 :param DataSet dataset: Dataset to import into 83 :param dict parameters: Dataset parameters 84 :return tuple: Items written, items skipped 85 """ 86 # write to the result file 87 hashtag = re.compile(r"#([^\s,.+=-]+)") 88 entity_name = "Page Name" if "Page Name" in reader.fieldnames else "Group Name" 89 overperforming_column = None 90 for item in reader: 91 hashtags = hashtag.findall(item["Message"]) 92 try: 93 date = datetime.datetime.strptime(" ".join(item["Post Created"].split(" ")[:2]), "%Y-%m-%d %H:%M:%S") 94 except ValueError: 95 yield InvalidImportedItem(reason=f"Cannot parse date/time '{item['Post Created']}'; skipping post") 96 97 is_from_elsewhere = item["Link"].find("https://www.facebook.com/" + item["User Name"]) < 0 98 shared_page = item["Link"].split("/")[3] if is_from_elsewhere and item["Link"].find( 99 "https://www.facebook.com/") == 0 else "" 100 101 # this one is a handful 102 # unicode in csv column names is no fun 103 if not overperforming_column: 104 overperforming_column = [c for c in item.keys() if "Overperforming" in c][0] 105 106 overperforming = item.get(overperforming_column, "") 107 108 item = { 109 "id": item["URL"].split("/")[-1], 110 "thread_id": item["URL"].split("/")[-1], 111 "body": item["Message"], 112 "author": item["User Name"], 113 "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'), 114 "unix_timestamp": int(date.timestamp()), 115 "page_name": item[entity_name], 116 "page_category": item["Page Category"], 117 "page_top_country": item["Page Admin Top Country"], 118 "page_description": item["Page Description"], 119 "page_created": item["Page Created"], 120 "page_likes": item["Likes at Posting"], 121 "page_id": item["Facebook Id"], 122 "page_followers": item["Followers at Posting"], 123 "page_shared_from": shared_page, 124 "type": item["Type"], 125 "interactions": convert_to_int(re.sub(r"[^0-9]", "", item["Total Interactions"]), 0) if item[ 126 "Total Interactions"] else 0, 127 "comments": item["Comments"], 128 "shares": item["Shares"], 129 "likes": item["Likes"], 130 "likes_love": item["Love"], 131 "likes_wow": item["Wow"], 132 "likes_haha": item["Haha"], 133 "likes_sad": item["Sad"], 134 "likes_angry": item["Angry"], 135 "likes_care": item["Care"], 136 "views_post": item["Post Views"], 137 "views_total": item["Total Views"], 138 "views_total_crossposts": item["Total Views For All Crossposts"], 139 "overperforming_score": overperforming, 140 "video_length": "" if item["Video Length"] == "N/A" else item["Video Length"], 141 "video_status": item["Video Share Status"], 142 "video_own": "yes" if item["Is Video Owner?"] == "Yes" else "no", 143 "url": item["URL"], 144 "hashtags": ",".join(hashtags), 145 "url_original": item["Final Link"] if item["Final Link"] else item["Link"], 146 "body_image": item["Image Text"], 147 "body_link": item["Link Text"], 148 "body_description": item["Description"], 149 "sponsor_id": item["Sponsor Id"], 150 "sponsor_name": item["Sponsor Name"], 151 "sponsor_category": item["Sponsor Category"] 152 } 153 154 yield item
Import an export of a CrowdTangle Facebook list
Parameters
- csv.DictReader reader: Reader object of input file
- Iterable columns: Required columns
- DataSet dataset: Dataset to import into
- dict parameters: Dataset parameters
Returns
Items written, items skipped
157def import_facepager(reader, columns, dataset, parameters): 158 """ 159 Import an export of a Facepager export 160 161 :param csv.DictReader reader: Reader object of input file 162 :param Iterable columns: Required columns 163 :param DataSet dataset: Dataset to import into 164 :param dict parameters: Dataset parameters 165 :return tuple: Items written, items skipped 166 """ 167 # write to the result file 168 for item in reader: 169 hashtags = json.loads(item["hashtags"]) 170 hashtags = [hashtag["name"] for hashtag in hashtags] 171 172 item = { 173 "id": item["id"], 174 "thread_id": item["id"], 175 "author": item["authorMeta.name"], 176 "body": item["text"], 177 "timestamp": datetime.datetime.utcfromtimestamp(int(item["createTime"])).strftime( 178 '%Y-%m-%d %H:%M:%S'), 179 "unix_timestamp": int(item["createTime"]), 180 "is_harmful": -1, 181 "is_duet": -1, 182 "music_name": item["musicMeta.musicName"], 183 "music_id": item["musicMeta.musicId"], 184 "music_author": item["musicMeta.musicAuthor"], 185 "video_url": item["videoUrl"], 186 "tiktok_url": "https://tiktok.com/@%s/video/%s" % (item["authorMeta.id"], item["id"]), 187 "thumbnail_url": item["covers.default"], 188 "amount_likes": item["diggCount"], 189 "amount_comments": item["commentCount"], 190 "amount_shares": item["shareCount"], 191 "amount_plays": item["playCount"], 192 "hashtags": ",".join(hashtags) 193 } 194 195 yield item
Import an export of a Facepager export
Parameters
- csv.DictReader reader: Reader object of input file
- Iterable columns: Required columns
- DataSet dataset: Dataset to import into
- dict parameters: Dataset parameters
Returns
Items written, items skipped
198def import_ytdt_videolist(reader, columns, dataset, parameters): 199 """ 200 Import an export of a YouTube Data Tools Video List export 201 202 :param csv.DictReader reader: Reader object of input file 203 :param Iterable columns: Required columns 204 :param DataSet dataset: Dataset to import into 205 :param dict parameters: Dataset parameters 206 :return tuple: Items written, items skipped 207 """ 208 # write to the result file 209 for item in reader: 210 try: 211 date = datetime.datetime.strptime(item["publishedAt"], "%Y-%m-%dT%H:%M:%SZ") # ex. 2022-11-11T05:30:01Z 212 except ValueError: 213 yield InvalidImportedItem(reason=f"Invalid date ({item['publishedAt']})") 214 continue 215 216 collection_date = "_".join(dataset.parameters.get("filename").split("_")[2:]).replace(".csv", "") 217 218 item = { 219 "id": item.get('videoId'), 220 "thread_id": item.get('channelId'), 221 "author": item.get('channelTitle'), 222 "body": item.get('videoDescription'), 223 "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'), 224 "unix_timestamp": int(date.timestamp()), 225 **item, 226 "source_filename": dataset.parameters.get("filename"), 227 "date_collected": collection_date, 228 "youtube_url": f"https://www.youtube.com/watch?v={item['videoId']}" 229 } 230 231 yield item
Import an export of a YouTube Data Tools Video List export
Parameters
- csv.DictReader reader: Reader object of input file
- Iterable columns: Required columns
- DataSet dataset: Dataset to import into
- dict parameters: Dataset parameters
Returns
Items written, items skipped
269def import_bzy_weibo(reader, columns, dataset, parameter): 270 """ 271 Import Weibo item collected by Bazhuayu 272 273 :param csv.DictReader reader: Reader object of input file 274 :param Iterable columns: Required columns 275 :param DataSet dataset: Dataset to import into 276 :param dict parameters: Dataset parameters 277 :return tuple: Items written, items skipped 278 """ 279 index = 1 280 year = datetime.datetime.now().year 281 282 for item in reader: 283 if "from1" not in item: 284 raise InvalidCustomFormat("CSV does not appear to be Bazhuayu format for Sina Weibo; please try importing again with CSV format set to \"Custom/other\".") 285 raw_timestamp = item["from1"].strip() 286 timestamp_bits = re.split(r"[年月日\s:]+", raw_timestamp) 287 288 if re.match(r"[0-9]{2}月[0-9]{2}日 [0-9]{2}:[0-9]{2}", raw_timestamp): 289 timestamp = datetime.datetime(year, int(timestamp_bits[0]), int(timestamp_bits[1]), int(timestamp_bits[2]), 290 int(timestamp_bits[3])) 291 elif re.match(r"[0-9]{4}[0-9]{2}月[0-9]{2}日 [0-9]{2}:[0-9]{2}", raw_timestamp): 292 293 timestamp = datetime.datetime(int(timestamp_bits[0]), int(timestamp_bits[1]), int(timestamp_bits[2]), 294 int(timestamp_bits[3]), int(timestamp_bits[4])) 295 else: 296 yield InvalidImportedItem(f"Cannot parse timestamp {raw_timestamp}") 297 298 item = { 299 "id": index, 300 "thread_id": index, 301 "author": item["标题"], 302 "body": item["txt"], 303 "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"), 304 "image_url": item["图片"], 305 **item, 306 "unix_timestamp": int(timestamp.timestamp()) 307 } 308 309 index += 1 310 yield item
Import Weibo item collected by Bazhuayu
Parameters
- csv.DictReader reader: Reader object of input file
- Iterable columns: Required columns
- DataSet dataset: Dataset to import into
- dict parameters: Dataset parameters
Returns
Items written, items skipped
313def map_csv_items(reader, columns, dataset, parameters): 314 """ 315 Read CSV items and put them in the 4CAT dataset file 316 317 This version of the method mostly just copies the file, applying the 318 supplied mapping where possible. It could alternatively apply more 319 fancy mappings. 320 321 :param csv.DictReader reader: Reader object of input file 322 :param Iterable columns: Required columns 323 :param DataSet dataset: Dataset to import into 324 :param dict parameters: Dataset parameters 325 :return tuple: Items written, items skipped 326 """ 327 # write to the result file 328 indexes = {} 329 now_timestmap = str(int(datetime.datetime.now().timestamp())) 330 for row in reader: 331 mapped_row = {} 332 for field in columns: 333 mapping = parameters.get("mapping-" + field) 334 if mapping: 335 if mapping == "__4cat_auto_sequence": 336 # auto-numbering 337 if field not in indexes: 338 indexes[field] = 1 339 mapped_row[field] = indexes[field] 340 indexes[field] += 1 341 elif mapping == "__4cat_empty_value": 342 mapped_row[field] = "" 343 elif mapping == "__4cat_now": 344 mapped_row[field] = now_timestmap 345 else: 346 # actual mapping 347 mapped_row[field] = row[mapping] 348 349 # ensure that timestamp is YYYY-MM-DD HH:MM:SS and that there 350 # is a unix timestamp. this will override the columns if they 351 # already exist! but it is necessary for 4CAT to handle the 352 # data in processors etc and should be an equivalent value. 353 if not mapped_row.get("timestamp"): 354 if mapped_row.get("unix_timestamp"): 355 # if unix timestamp is given, convert to datetime 356 try: 357 timestamp = datetime.datetime.fromtimestamp(int(mapped_row["unix_timestamp"])) 358 mapped_row["timestamp"] = timestamp.strftime("%Y-%m-%d %H:%M:%S") 359 except (ValueError, OSError) as e: 360 yield InvalidImportedItem(f"{e.__class__.__name__} - {e} (value was '{mapped_row['unix_timestamp']}')") 361 continue 362 363 # no timestamp given, set to empty string 364 mapped_row["timestamp"] = "" 365 mapped_row["unix_timestamp"] = None 366 367 else: 368 try: 369 370 if mapped_row["timestamp"].replace(".", "").isdecimal() and mapped_row["timestamp"].count(".") <= 1: # ignore . for floats 371 timestamp = datetime.datetime.fromtimestamp(float(mapped_row["timestamp"])) 372 else: 373 timestamp = parse_datetime(mapped_row["timestamp"]) 374 375 mapped_row["timestamp"] = timestamp.strftime("%Y-%m-%d %H:%M:%S") 376 mapped_row["unix_timestamp"] = int(timestamp.timestamp()) 377 378 except (ValueError, OSError, AttributeError) as e: 379 # skip rows without a valid timestamp - this may happen 380 # despite validation because only a sample is validated 381 # this is an OSError on Windows sometimes??? 382 yield InvalidImportedItem(f"{e.__class__.__name__} - {e} (value was '{mapped_row['timestamp']}')") 383 continue 384 385 # this ensures that the required columns are always the first 386 # columns, and the rest is in original order 387 for field, value in row.items(): 388 if field not in mapped_row and field: 389 mapped_row[field] = value 390 391 yield mapped_row
Read CSV items and put them in the 4CAT dataset file
This version of the method mostly just copies the file, applying the supplied mapping where possible. It could alternatively apply more fancy mappings.
Parameters
- csv.DictReader reader: Reader object of input file
- Iterable columns: Required columns
- DataSet dataset: Dataset to import into
- dict parameters: Dataset parameters
Returns
Items written, items skipped
Import an export of a YouTube Data Tools Video Info export
Parameters
Returns