datasources.upload.import_formats
1import datetime 2import json 3import csv 4import re 5 6from dateutil.parser import parse as parse_datetime 7from common.lib.exceptions import ProcessorException 8from common.lib.helpers import convert_to_int 9 10 11class InvalidCustomFormat(ProcessorException): 12 """ 13 Raise if processor throws an exception 14 """ 15 pass 16 17 18class InvalidImportedItem: 19 """ 20 Generic data class to pass to have the importer recognise an item as 21 one that should not be written to the result CSV file 22 """ 23 reason = "" 24 25 def __init__(self, reason=""): 26 self.reason = reason 27 28 29def import_crowdtangle_instagram(reader, columns, dataset, parameters): 30 """ 31 Import an export of a CrowdTangle Instagram list 32 33 :param csv.DictReader reader: Reader object of input file 34 :param Iterable columns: Required columns 35 :param DataSet dataset: Dataset to import into 36 :param dict parameters: Dataset parameters 37 :return tuple: Items written, items skipped 38 """ 39 # write to the result file 40 hashtag = re.compile(r"#([^\s,.+=-]+)") 41 usertag = re.compile(r"@([^\s,.+=-]+)") 42 for item in reader: 43 url = item["URL"] 44 url = re.sub(r"/*$", "", url) 45 46 post_id = url.split("/")[-1] 47 caption = item["Description"] 48 hashtags = hashtag.findall(caption) 49 usertags = usertag.findall(caption) 50 51 datestamp = " ".join(item["Post Created"].split(" ")[:-1]) 52 date = datetime.datetime.strptime(datestamp, "%Y-%m-%d %H:%M:%S") 53 54 item = { 55 "id": post_id, 56 "thread_id": post_id, 57 "parent_id": post_id, 58 "body": caption if caption is not None else "", 59 "author": item["User Name"], 60 "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'), 61 "unix_timestamp": int(date.timestamp()), 62 "type": "picture" if item["Type"] == "Photo" else item["Type"].lower(), 63 "url": item["URL"], 64 "thumbnail_url": item["Photo"], 65 "hashtags": ",".join(hashtags), 66 "usertags": ",".join(usertags), 67 "mentioned": "", 68 "num_likes": item["Likes"], 69 "num_comments": item["Comments"], 70 "subject": item["Title"] 71 } 72 73 yield item 74 75 76def import_crowdtangle_facebook(reader, columns, dataset, parameters): 77 """ 78 Import an export of a CrowdTangle Facebook list 79 80 :param csv.DictReader reader: Reader object of input file 81 :param Iterable columns: Required columns 82 :param DataSet dataset: Dataset to import into 83 :param dict parameters: Dataset parameters 84 :return tuple: Items written, items skipped 85 """ 86 # write to the result file 87 hashtag = re.compile(r"#([^\s,.+=-]+)") 88 entity_name = "Page Name" if "Page Name" in reader.fieldnames else "Group Name" 89 overperforming_column = None 90 for item in reader: 91 hashtags = hashtag.findall(item["Message"]) 92 try: 93 date = datetime.datetime.strptime(" ".join(item["Post Created"].split(" ")[:2]), "%Y-%m-%d %H:%M:%S") 94 except ValueError: 95 yield InvalidImportedItem(reason=f"Cannot parse date/time '{item['Post Created']}'; skipping post") 96 97 is_from_elsewhere = item["Link"].find("https://www.facebook.com/" + item["User Name"]) < 0 98 shared_page = item["Link"].split("/")[3] if is_from_elsewhere and item["Link"].find( 99 "https://www.facebook.com/") == 0 else "" 100 101 # this one is a handful 102 # unicode in csv column names is no fun 103 if not overperforming_column: 104 overperforming_column = [c for c in item.keys() if "Overperforming" in c][0] 105 106 overperforming = item.get(overperforming_column, "") 107 108 item = { 109 "id": item["URL"].split("/")[-1], 110 "thread_id": item["URL"].split("/")[-1], 111 "body": item["Message"], 112 "author": item["User Name"], 113 "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'), 114 "unix_timestamp": int(date.timestamp()), 115 "page_name": item[entity_name], 116 "page_category": item["Page Category"], 117 "page_top_country": item["Page Admin Top Country"], 118 "page_description": item["Page Description"], 119 "page_created": item["Page Created"], 120 "page_likes": item["Likes at Posting"], 121 "page_id": item["Facebook Id"], 122 "page_followers": item["Followers at Posting"], 123 "page_shared_from": shared_page, 124 "type": item["Type"], 125 "interactions": convert_to_int(re.sub(r"[^0-9]", "", item["Total Interactions"]), 0) if item[ 126 "Total Interactions"] else 0, 127 "comments": item["Comments"], 128 "shares": item["Shares"], 129 "likes": item["Likes"], 130 "likes_love": item["Love"], 131 "likes_wow": item["Wow"], 132 "likes_haha": item["Haha"], 133 "likes_sad": item["Sad"], 134 "likes_angry": item["Angry"], 135 "likes_care": item["Care"], 136 "views_post": item["Post Views"], 137 "views_total": item["Total Views"], 138 "views_total_crossposts": item["Total Views For All Crossposts"], 139 "overperforming_score": overperforming, 140 "video_length": "" if item["Video Length"] == "N/A" else item["Video Length"], 141 "video_status": item["Video Share Status"], 142 "video_own": "yes" if item["Is Video Owner?"] == "Yes" else "no", 143 "url": item["URL"], 144 "hashtags": ",".join(hashtags), 145 "url_original": item["Final Link"] if item["Final Link"] else item["Link"], 146 "body_image": item["Image Text"], 147 "body_link": item["Link Text"], 148 "body_description": item["Description"], 149 "sponsor_id": item["Sponsor Id"], 150 "sponsor_name": item["Sponsor Name"], 151 "sponsor_category": item["Sponsor Category"] 152 } 153 154 yield item 155 156 157def import_facepager(reader, columns, dataset, parameters): 158 """ 159 Import an export of a Facepager export 160 161 :param csv.DictReader reader: Reader object of input file 162 :param Iterable columns: Required columns 163 :param DataSet dataset: Dataset to import into 164 :param dict parameters: Dataset parameters 165 :return tuple: Items written, items skipped 166 """ 167 # write to the result file 168 for item in reader: 169 hashtags = json.loads(item["hashtags"]) 170 hashtags = [hashtag["name"] for hashtag in hashtags] 171 172 item = { 173 "id": item["id"], 174 "thread_id": item["id"], 175 "author": item["authorMeta.name"], 176 "body": item["text"], 177 "timestamp": datetime.datetime.utcfromtimestamp(int(item["createTime"])).strftime( 178 '%Y-%m-%d %H:%M:%S'), 179 "unix_timestamp": int(item["createTime"]), 180 "is_harmful": -1, 181 "is_duet": -1, 182 "music_name": item["musicMeta.musicName"], 183 "music_id": item["musicMeta.musicId"], 184 "music_author": item["musicMeta.musicAuthor"], 185 "video_url": item["videoUrl"], 186 "tiktok_url": "https://tiktok.com/@%s/video/%s" % (item["authorMeta.id"], item["id"]), 187 "thumbnail_url": item["covers.default"], 188 "amount_likes": item["diggCount"], 189 "amount_comments": item["commentCount"], 190 "amount_shares": item["shareCount"], 191 "amount_plays": item["playCount"], 192 "hashtags": ",".join(hashtags) 193 } 194 195 yield item 196 197 198def import_ytdt_videolist(reader, columns, dataset, parameters): 199 """ 200 Import an export of a YouTube Data Tools Video List export 201 202 :param csv.DictReader reader: Reader object of input file 203 :param Iterable columns: Required columns 204 :param DataSet dataset: Dataset to import into 205 :param dict parameters: Dataset parameters 206 :return tuple: Items written, items skipped 207 """ 208 # write to the result file 209 for item in reader: 210 try: 211 date = datetime.datetime.strptime(item["publishedAt"], "%Y-%m-%dT%H:%M:%SZ") # ex. 2022-11-11T05:30:01Z 212 except ValueError: 213 yield InvalidImportedItem(reason=f"Invalid date ({item['publishedAt']})") 214 continue 215 216 collection_date = "_".join(dataset.parameters.get("filename").split("_")[2:]).replace(".csv", "") 217 218 item = { 219 "id": item.get('videoId'), 220 "thread_id": item.get('channelId'), 221 "author": item.get('channelTitle'), 222 "body": item.get('videoDescription'), 223 "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'), 224 "unix_timestamp": int(date.timestamp()), 225 **item, 226 "source_filename": dataset.parameters.get("filename"), 227 "date_collected": collection_date, 228 "youtube_url": f"https://www.youtube.com/watch?v={item['videoId']}" 229 } 230 231 yield item 232 233 234def import_ytdt_commentlist(reader, columns, dataset, parameters): 235 """ 236 Import an export of a YouTube Data Tools Video Info export 237 238 :param csv.DictReader reader: Reader object of input file 239 :param Iterable columns: Required columns 240 :param DataSet dataset: Dataset to import into 241 :param dict parameters: Dataset parameters 242 :return tuple: Items written, items skipped 243 """ 244 # write to the result file 245 for item in reader: 246 try: 247 date = datetime.datetime.strptime(item["publishedAt"], "%Y-%m-%d %H:%M:%S") # ex. 2022-11-11 05:30:01 248 except ValueError: 249 yield InvalidImportedItem(reason=f"Invalid date ({item['publishedAt']})") 250 continue 251 252 collection_date = "_".join(dataset.parameters.get("filename").split("_")[2:]).replace(".csv", "") 253 254 item = { 255 "id": item["id"], 256 "thread_id": item["isReplyTo"] if item["isReplyTo"] else item["id"], 257 "author": item["authorName"], 258 "body": item["text"], 259 "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'), 260 "unix_timestamp": int(date.timestamp()), 261 **item, 262 "source_filename": dataset.parameters.get("filename"), 263 "date_collected": collection_date, 264 } 265 266 yield item 267 268 269def import_bzy_weibo(reader, columns, dataset, parameter): 270 """ 271 Import Weibo item collected by Bazhuayu 272 273 :param csv.DictReader reader: Reader object of input file 274 :param Iterable columns: Required columns 275 :param DataSet dataset: Dataset to import into 276 :param dict parameters: Dataset parameters 277 :return tuple: Items written, items skipped 278 """ 279 index = 1 280 year = datetime.datetime.now().year 281 282 for item in reader: 283 if "from1" not in item: 284 raise InvalidCustomFormat("CSV does not appear to be Bazhuayu format for Sina Weibo; please try importing again with CSV format set to \"Custom/other\".") 285 raw_timestamp = item["from1"].strip() 286 timestamp_bits = re.split(r"[年月日\s:]+", raw_timestamp) 287 288 if re.match(r"[0-9]{2}月[0-9]{2}日 [0-9]{2}:[0-9]{2}", raw_timestamp): 289 timestamp = datetime.datetime(year, int(timestamp_bits[0]), int(timestamp_bits[1]), int(timestamp_bits[2]), 290 int(timestamp_bits[3])) 291 elif re.match(r"[0-9]{4}[0-9]{2}月[0-9]{2}日 [0-9]{2}:[0-9]{2}", raw_timestamp): 292 293 timestamp = datetime.datetime(int(timestamp_bits[0]), int(timestamp_bits[1]), int(timestamp_bits[2]), 294 int(timestamp_bits[3]), int(timestamp_bits[4])) 295 else: 296 yield InvalidImportedItem(f"Cannot parse timestamp {raw_timestamp}") 297 298 item = { 299 "id": index, 300 "thread_id": index, 301 "author": item["标题"], 302 "body": item["txt"], 303 "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"), 304 "image_url": item["图片"], 305 **item, 306 "unix_timestamp": int(timestamp.timestamp()) 307 } 308 309 index += 1 310 yield item 311 312 313def map_csv_items(reader, columns, dataset, parameters): 314 """ 315 Read CSV items and put them in the 4CAT dataset file 316 317 This version of the method mostly just copies the file, applying the 318 supplied mapping where possible. It could alternatively apply more 319 fancy mappings. 320 321 :param csv.DictReader reader: Reader object of input file 322 :param Iterable columns: Required columns 323 :param DataSet dataset: Dataset to import into 324 :param dict parameters: Dataset parameters 325 :return tuple: Items written, items skipped 326 """ 327 # write to the result file 328 indexes = {} 329 now_timestmap = str(int(datetime.datetime.now().timestamp())) 330 for row in reader: 331 mapped_row = {} 332 for field in columns: 333 mapping = parameters.get("mapping-" + field) 334 if mapping: 335 if mapping == "__4cat_auto_sequence": 336 # auto-numbering 337 if field not in indexes: 338 indexes[field] = 1 339 mapped_row[field] = indexes[field] 340 indexes[field] += 1 341 elif mapping == "__4cat_empty_value": 342 mapped_row[field] = "" 343 elif mapping == "__4cat_now": 344 mapped_row[field] = now_timestmap 345 else: 346 # actual mapping 347 mapped_row[field] = row[mapping] 348 349 # ensure that timestamp is YYYY-MM-DD HH:MM:SS and that there 350 # is a unix timestamp. this will override the columns if they 351 # already exist! but it is necessary for 4CAT to handle the 352 # data in processors etc and should be an equivalent value. 353 try: 354 if mapped_row["timestamp"].replace(".", "").isdecimal() and mapped_row["timestamp"].count(".") <= 1: # ignore . for floats 355 timestamp = datetime.datetime.fromtimestamp(float(mapped_row["timestamp"])) 356 else: 357 timestamp = parse_datetime(mapped_row["timestamp"]) 358 359 mapped_row["timestamp"] = timestamp.strftime("%Y-%m-%d %H:%M:%S") 360 mapped_row["unix_timestamp"] = int(timestamp.timestamp()) 361 362 # this ensures that the required columns are always the first 363 # columns, and the rest is in original order 364 for field, value in row.items(): 365 if field not in mapped_row and field: 366 mapped_row[field] = value 367 368 except (ValueError, OSError, AttributeError) as e: 369 # skip rows without a valid timestamp - this may happen 370 # despite validation because only a sample is validated 371 # this is an OSError on Windows sometimes??? 372 yield InvalidImportedItem(f"{e.__class__.__name__} - {e} (value was '{mapped_row['timestamp']}')") 373 continue 374 375 yield mapped_row 376 377 378# tools that are supported for importing 379# defined here (instead of at the top) so we can refer to the functions 380# defined above 381# format: dictionary with keys name, columns, mapper 382# name is a human-readable name for this format (e.g. a tool name) 383# columns is a set of required columns in the uploaded csv 384# mapper is a function that writes the 4CAT-compatible CSV 385tools = { 386 "instagram-crowdtangle": { 387 "name": "Instagram (via CrowdTangle export)", 388 "columns": {"Account", "User Name", "Followers at Posting", "Post Created", "Type", "Likes", "Comments", 389 "Views", "URL", "Link", "Photo", "Title", "Description"}, 390 "mapper": import_crowdtangle_instagram 391 }, 392 "facebook-crowdtangle": { 393 "name": "Facebook (via CrowdTangle export)", 394 "columns": {"Page Name", "User Name", "Facebook Id", "Page Category", "Page Admin Top Country", 395 "Page Description", "Page Created", "Likes at Posting", "Followers at Posting", "Post Created", 396 "Post Created Date", "Post Created Time", "Type", "Total Interactions", "Likes", "Comments", 397 "Shares", "Love", "Wow", "Haha", "Sad", "Angry", "Care", "Video Share Status", 398 "Is Video Owner?", "Post Views", "Total Views", "Total Views For All Crossposts", 399 "Video Length", "URL", "Message", "Link", "Final Link", "Image Text", "Link Text", 400 "Description", "Sponsor Id", "Sponsor Name", "Sponsor Category"}, 401 "mapper": import_crowdtangle_facebook 402 }, 403 "facepager": { 404 "name": "Facebook (via Facepager export)", 405 "columns": {"path", "id", "parent_id", "level", "object_id", "object_type", "query_status", "query_time", 406 "query_type", "from.name", "created_time", "type", "link", "picture", "full_picture", "", 407 "comments.summary.total_count", "shares.count", "reactions.summary.total_count", 408 "like.summary.total_count", "love.summary.total_count", "haha.summary.total_count", 409 "wow.summary.total_count", "sad.summary.total_count", "angry.summary.total_count", "message"}, 410 "mapper": import_facepager 411 }, 412 "youtube_video_list": { 413 "name": "YouTube videos (via YouTube Data Tools' Video List module)", 414 "columns": {"publishedAt", "videoId", "channelId", "channelTitle", "videoDescription"}, 415 "mapper": import_ytdt_videolist, 416 "csv_dialect": {"doublequote": True, "escapechar": "\\"}, 417 }, 418 "youtube_comment_list": { 419 "name": "YouTube comments (via YouTube Data Tools' Video Info module)", 420 "columns": {"id", "isReplyTo", "authorName", "text", "publishedAt"}, 421 "mapper": import_ytdt_commentlist, 422 "csv_dialect": {"doublequote": True, "escapechar": "\\"}, 423 }, 424 "bazhuayu_weibo": { 425 "name": "Sina Weibo (via Bazhuayu)", 426 "columns": {}, 427 "mapper": import_bzy_weibo 428 }, 429 "custom": { 430 "name": "Custom/other", 431 "columns": { 432 "id": "A value that uniquely identifies the item, like a numerical ID.", 433 "thread_id": "A value that uniquely identifies the sub-collection an item is a part of, e.g. a forum " 434 "thread. If this does not apply to your dataset you can use the same value as for 'id' " 435 "here.", 436 "author": "A value that identifies the author of the item. If the option to pseudonymise data is " 437 "selected below, this field will be pseudonymised.", 438 "body": "The 'content' of the item, e.g. a post's text.", 439 "timestamp": "The time the item was made or posted. 4CAT will try to interpret this value, but for the " 440 "best results use YYYY-MM-DD HH:MM:SS notation." 441 }, 442 "mapper": map_csv_items, 443 "allow_user_mapping": True 444 } 445}
12class InvalidCustomFormat(ProcessorException): 13 """ 14 Raise if processor throws an exception 15 """ 16 pass
Raise if processor throws an exception
Inherited Members
19class InvalidImportedItem: 20 """ 21 Generic data class to pass to have the importer recognise an item as 22 one that should not be written to the result CSV file 23 """ 24 reason = "" 25 26 def __init__(self, reason=""): 27 self.reason = reason
Generic data class to pass to have the importer recognise an item as one that should not be written to the result CSV file
30def import_crowdtangle_instagram(reader, columns, dataset, parameters): 31 """ 32 Import an export of a CrowdTangle Instagram list 33 34 :param csv.DictReader reader: Reader object of input file 35 :param Iterable columns: Required columns 36 :param DataSet dataset: Dataset to import into 37 :param dict parameters: Dataset parameters 38 :return tuple: Items written, items skipped 39 """ 40 # write to the result file 41 hashtag = re.compile(r"#([^\s,.+=-]+)") 42 usertag = re.compile(r"@([^\s,.+=-]+)") 43 for item in reader: 44 url = item["URL"] 45 url = re.sub(r"/*$", "", url) 46 47 post_id = url.split("/")[-1] 48 caption = item["Description"] 49 hashtags = hashtag.findall(caption) 50 usertags = usertag.findall(caption) 51 52 datestamp = " ".join(item["Post Created"].split(" ")[:-1]) 53 date = datetime.datetime.strptime(datestamp, "%Y-%m-%d %H:%M:%S") 54 55 item = { 56 "id": post_id, 57 "thread_id": post_id, 58 "parent_id": post_id, 59 "body": caption if caption is not None else "", 60 "author": item["User Name"], 61 "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'), 62 "unix_timestamp": int(date.timestamp()), 63 "type": "picture" if item["Type"] == "Photo" else item["Type"].lower(), 64 "url": item["URL"], 65 "thumbnail_url": item["Photo"], 66 "hashtags": ",".join(hashtags), 67 "usertags": ",".join(usertags), 68 "mentioned": "", 69 "num_likes": item["Likes"], 70 "num_comments": item["Comments"], 71 "subject": item["Title"] 72 } 73 74 yield item
Import an export of a CrowdTangle Instagram list
Parameters
- csv.DictReader reader: Reader object of input file
- Iterable columns: Required columns
- DataSet dataset: Dataset to import into
- dict parameters: Dataset parameters
Returns
Items written, items skipped
77def import_crowdtangle_facebook(reader, columns, dataset, parameters): 78 """ 79 Import an export of a CrowdTangle Facebook list 80 81 :param csv.DictReader reader: Reader object of input file 82 :param Iterable columns: Required columns 83 :param DataSet dataset: Dataset to import into 84 :param dict parameters: Dataset parameters 85 :return tuple: Items written, items skipped 86 """ 87 # write to the result file 88 hashtag = re.compile(r"#([^\s,.+=-]+)") 89 entity_name = "Page Name" if "Page Name" in reader.fieldnames else "Group Name" 90 overperforming_column = None 91 for item in reader: 92 hashtags = hashtag.findall(item["Message"]) 93 try: 94 date = datetime.datetime.strptime(" ".join(item["Post Created"].split(" ")[:2]), "%Y-%m-%d %H:%M:%S") 95 except ValueError: 96 yield InvalidImportedItem(reason=f"Cannot parse date/time '{item['Post Created']}'; skipping post") 97 98 is_from_elsewhere = item["Link"].find("https://www.facebook.com/" + item["User Name"]) < 0 99 shared_page = item["Link"].split("/")[3] if is_from_elsewhere and item["Link"].find( 100 "https://www.facebook.com/") == 0 else "" 101 102 # this one is a handful 103 # unicode in csv column names is no fun 104 if not overperforming_column: 105 overperforming_column = [c for c in item.keys() if "Overperforming" in c][0] 106 107 overperforming = item.get(overperforming_column, "") 108 109 item = { 110 "id": item["URL"].split("/")[-1], 111 "thread_id": item["URL"].split("/")[-1], 112 "body": item["Message"], 113 "author": item["User Name"], 114 "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'), 115 "unix_timestamp": int(date.timestamp()), 116 "page_name": item[entity_name], 117 "page_category": item["Page Category"], 118 "page_top_country": item["Page Admin Top Country"], 119 "page_description": item["Page Description"], 120 "page_created": item["Page Created"], 121 "page_likes": item["Likes at Posting"], 122 "page_id": item["Facebook Id"], 123 "page_followers": item["Followers at Posting"], 124 "page_shared_from": shared_page, 125 "type": item["Type"], 126 "interactions": convert_to_int(re.sub(r"[^0-9]", "", item["Total Interactions"]), 0) if item[ 127 "Total Interactions"] else 0, 128 "comments": item["Comments"], 129 "shares": item["Shares"], 130 "likes": item["Likes"], 131 "likes_love": item["Love"], 132 "likes_wow": item["Wow"], 133 "likes_haha": item["Haha"], 134 "likes_sad": item["Sad"], 135 "likes_angry": item["Angry"], 136 "likes_care": item["Care"], 137 "views_post": item["Post Views"], 138 "views_total": item["Total Views"], 139 "views_total_crossposts": item["Total Views For All Crossposts"], 140 "overperforming_score": overperforming, 141 "video_length": "" if item["Video Length"] == "N/A" else item["Video Length"], 142 "video_status": item["Video Share Status"], 143 "video_own": "yes" if item["Is Video Owner?"] == "Yes" else "no", 144 "url": item["URL"], 145 "hashtags": ",".join(hashtags), 146 "url_original": item["Final Link"] if item["Final Link"] else item["Link"], 147 "body_image": item["Image Text"], 148 "body_link": item["Link Text"], 149 "body_description": item["Description"], 150 "sponsor_id": item["Sponsor Id"], 151 "sponsor_name": item["Sponsor Name"], 152 "sponsor_category": item["Sponsor Category"] 153 } 154 155 yield item
Import an export of a CrowdTangle Facebook list
Parameters
- csv.DictReader reader: Reader object of input file
- Iterable columns: Required columns
- DataSet dataset: Dataset to import into
- dict parameters: Dataset parameters
Returns
Items written, items skipped
158def import_facepager(reader, columns, dataset, parameters): 159 """ 160 Import an export of a Facepager export 161 162 :param csv.DictReader reader: Reader object of input file 163 :param Iterable columns: Required columns 164 :param DataSet dataset: Dataset to import into 165 :param dict parameters: Dataset parameters 166 :return tuple: Items written, items skipped 167 """ 168 # write to the result file 169 for item in reader: 170 hashtags = json.loads(item["hashtags"]) 171 hashtags = [hashtag["name"] for hashtag in hashtags] 172 173 item = { 174 "id": item["id"], 175 "thread_id": item["id"], 176 "author": item["authorMeta.name"], 177 "body": item["text"], 178 "timestamp": datetime.datetime.utcfromtimestamp(int(item["createTime"])).strftime( 179 '%Y-%m-%d %H:%M:%S'), 180 "unix_timestamp": int(item["createTime"]), 181 "is_harmful": -1, 182 "is_duet": -1, 183 "music_name": item["musicMeta.musicName"], 184 "music_id": item["musicMeta.musicId"], 185 "music_author": item["musicMeta.musicAuthor"], 186 "video_url": item["videoUrl"], 187 "tiktok_url": "https://tiktok.com/@%s/video/%s" % (item["authorMeta.id"], item["id"]), 188 "thumbnail_url": item["covers.default"], 189 "amount_likes": item["diggCount"], 190 "amount_comments": item["commentCount"], 191 "amount_shares": item["shareCount"], 192 "amount_plays": item["playCount"], 193 "hashtags": ",".join(hashtags) 194 } 195 196 yield item
Import an export of a Facepager export
Parameters
- csv.DictReader reader: Reader object of input file
- Iterable columns: Required columns
- DataSet dataset: Dataset to import into
- dict parameters: Dataset parameters
Returns
Items written, items skipped
199def import_ytdt_videolist(reader, columns, dataset, parameters): 200 """ 201 Import an export of a YouTube Data Tools Video List export 202 203 :param csv.DictReader reader: Reader object of input file 204 :param Iterable columns: Required columns 205 :param DataSet dataset: Dataset to import into 206 :param dict parameters: Dataset parameters 207 :return tuple: Items written, items skipped 208 """ 209 # write to the result file 210 for item in reader: 211 try: 212 date = datetime.datetime.strptime(item["publishedAt"], "%Y-%m-%dT%H:%M:%SZ") # ex. 2022-11-11T05:30:01Z 213 except ValueError: 214 yield InvalidImportedItem(reason=f"Invalid date ({item['publishedAt']})") 215 continue 216 217 collection_date = "_".join(dataset.parameters.get("filename").split("_")[2:]).replace(".csv", "") 218 219 item = { 220 "id": item.get('videoId'), 221 "thread_id": item.get('channelId'), 222 "author": item.get('channelTitle'), 223 "body": item.get('videoDescription'), 224 "timestamp": date.strftime('%Y-%m-%d %H:%M:%S'), 225 "unix_timestamp": int(date.timestamp()), 226 **item, 227 "source_filename": dataset.parameters.get("filename"), 228 "date_collected": collection_date, 229 "youtube_url": f"https://www.youtube.com/watch?v={item['videoId']}" 230 } 231 232 yield item
Import an export of a YouTube Data Tools Video List export
Parameters
- csv.DictReader reader: Reader object of input file
- Iterable columns: Required columns
- DataSet dataset: Dataset to import into
- dict parameters: Dataset parameters
Returns
Items written, items skipped
270def import_bzy_weibo(reader, columns, dataset, parameter): 271 """ 272 Import Weibo item collected by Bazhuayu 273 274 :param csv.DictReader reader: Reader object of input file 275 :param Iterable columns: Required columns 276 :param DataSet dataset: Dataset to import into 277 :param dict parameters: Dataset parameters 278 :return tuple: Items written, items skipped 279 """ 280 index = 1 281 year = datetime.datetime.now().year 282 283 for item in reader: 284 if "from1" not in item: 285 raise InvalidCustomFormat("CSV does not appear to be Bazhuayu format for Sina Weibo; please try importing again with CSV format set to \"Custom/other\".") 286 raw_timestamp = item["from1"].strip() 287 timestamp_bits = re.split(r"[年月日\s:]+", raw_timestamp) 288 289 if re.match(r"[0-9]{2}月[0-9]{2}日 [0-9]{2}:[0-9]{2}", raw_timestamp): 290 timestamp = datetime.datetime(year, int(timestamp_bits[0]), int(timestamp_bits[1]), int(timestamp_bits[2]), 291 int(timestamp_bits[3])) 292 elif re.match(r"[0-9]{4}[0-9]{2}月[0-9]{2}日 [0-9]{2}:[0-9]{2}", raw_timestamp): 293 294 timestamp = datetime.datetime(int(timestamp_bits[0]), int(timestamp_bits[1]), int(timestamp_bits[2]), 295 int(timestamp_bits[3]), int(timestamp_bits[4])) 296 else: 297 yield InvalidImportedItem(f"Cannot parse timestamp {raw_timestamp}") 298 299 item = { 300 "id": index, 301 "thread_id": index, 302 "author": item["标题"], 303 "body": item["txt"], 304 "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"), 305 "image_url": item["图片"], 306 **item, 307 "unix_timestamp": int(timestamp.timestamp()) 308 } 309 310 index += 1 311 yield item
Import Weibo item collected by Bazhuayu
Parameters
- csv.DictReader reader: Reader object of input file
- Iterable columns: Required columns
- DataSet dataset: Dataset to import into
- dict parameters: Dataset parameters
Returns
Items written, items skipped
314def map_csv_items(reader, columns, dataset, parameters): 315 """ 316 Read CSV items and put them in the 4CAT dataset file 317 318 This version of the method mostly just copies the file, applying the 319 supplied mapping where possible. It could alternatively apply more 320 fancy mappings. 321 322 :param csv.DictReader reader: Reader object of input file 323 :param Iterable columns: Required columns 324 :param DataSet dataset: Dataset to import into 325 :param dict parameters: Dataset parameters 326 :return tuple: Items written, items skipped 327 """ 328 # write to the result file 329 indexes = {} 330 now_timestmap = str(int(datetime.datetime.now().timestamp())) 331 for row in reader: 332 mapped_row = {} 333 for field in columns: 334 mapping = parameters.get("mapping-" + field) 335 if mapping: 336 if mapping == "__4cat_auto_sequence": 337 # auto-numbering 338 if field not in indexes: 339 indexes[field] = 1 340 mapped_row[field] = indexes[field] 341 indexes[field] += 1 342 elif mapping == "__4cat_empty_value": 343 mapped_row[field] = "" 344 elif mapping == "__4cat_now": 345 mapped_row[field] = now_timestmap 346 else: 347 # actual mapping 348 mapped_row[field] = row[mapping] 349 350 # ensure that timestamp is YYYY-MM-DD HH:MM:SS and that there 351 # is a unix timestamp. this will override the columns if they 352 # already exist! but it is necessary for 4CAT to handle the 353 # data in processors etc and should be an equivalent value. 354 try: 355 if mapped_row["timestamp"].replace(".", "").isdecimal() and mapped_row["timestamp"].count(".") <= 1: # ignore . for floats 356 timestamp = datetime.datetime.fromtimestamp(float(mapped_row["timestamp"])) 357 else: 358 timestamp = parse_datetime(mapped_row["timestamp"]) 359 360 mapped_row["timestamp"] = timestamp.strftime("%Y-%m-%d %H:%M:%S") 361 mapped_row["unix_timestamp"] = int(timestamp.timestamp()) 362 363 # this ensures that the required columns are always the first 364 # columns, and the rest is in original order 365 for field, value in row.items(): 366 if field not in mapped_row and field: 367 mapped_row[field] = value 368 369 except (ValueError, OSError, AttributeError) as e: 370 # skip rows without a valid timestamp - this may happen 371 # despite validation because only a sample is validated 372 # this is an OSError on Windows sometimes??? 373 yield InvalidImportedItem(f"{e.__class__.__name__} - {e} (value was '{mapped_row['timestamp']}')") 374 continue 375 376 yield mapped_row
Read CSV items and put them in the 4CAT dataset file
This version of the method mostly just copies the file, applying the supplied mapping where possible. It could alternatively apply more fancy mappings.
Parameters
- csv.DictReader reader: Reader object of input file
- Iterable columns: Required columns
- DataSet dataset: Dataset to import into
- dict parameters: Dataset parameters
Returns
Items written, items skipped
Import an export of a YouTube Data Tools Video Info export
Parameters
Returns