datasources.vk.search_vk
VK keyword search
1""" 2VK keyword search 3""" 4import datetime 5 6import vk_api 7 8from backend.lib.search import Search 9from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, ProcessorException 10from common.lib.helpers import UserInput 11from common.lib.item_mapping import MappedItem 12 13 14class SearchVK(Search): 15 """ 16 Get posts via the VK API 17 """ 18 type = "vk-search" # job ID 19 title = "VK" 20 extension = "ndjson" 21 is_local = False # Whether this datasource is locally scraped 22 is_static = False # Whether this datasource is still updated 23 24 previous_request = 0 25 import_issues = True 26 27 references = [ 28 "[VK API documentation](https://vk.com/dev/first_guide)", 29 "[Python API wrapper](https://github.com/python273/vk_api)" 30 ] 31 32 expanded_profile_fields = "id,screen_name,first_name,last_name,name,deactivated,is_closed,is_admin,sex,city,country,photo_200,photo_100,photo_50,followers_count,members_count" # https://vk.com/dev/objects/user & https://vk.com/dev/objects/group 33 34 @classmethod 35 def get_options(cls, parent_dataset=None, config=None): 36 """ 37 Get VK data source options 38 39 :param config: 40 :param parent_dataset: Should always be None 41 :return dict: Data source options 42 """ 43 44 intro_text = ("This data source uses VK's [API](https://vk.com/dev/first_guide) and a python " 45 "[wrapper](https://github.com/python273/vk_api) to request information from VK using your " 46 "username and password.") 47 48 return { 49 "intro-1": { 50 "type": UserInput.OPTION_INFO, 51 "help": intro_text 52 }, 53 "query_type": { 54 "type": UserInput.OPTION_CHOICE, 55 "help": "Query Type", 56 "options": { 57 "newsfeed": "News Feed search", 58 }, 59 "default": "newsfeed" 60 }, 61 "intro-2": { 62 "type": UserInput.OPTION_INFO, 63 "help": "Your username and password will be deleted after your query is complete." 64 }, 65 "username": { 66 "type": UserInput.OPTION_TEXT, 67 "sensitive": True, 68 "cache": True, 69 "help": "VK Username" 70 }, 71 "password": { 72 "type": UserInput.OPTION_TEXT, 73 "sensitive": True, 74 "cache": True, 75 "help": "VK Password" 76 }, 77 "intro-3": { 78 "type": UserInput.OPTION_INFO, 79 "help": "Enter the text to search for below." 80 }, 81 "query": { 82 "type": UserInput.OPTION_TEXT_LARGE, 83 "help": "Query" 84 }, 85 "amount": { 86 "type": UserInput.OPTION_TEXT, 87 "help": "Max items to retrieve", 88 "min": 0, 89 "max": 1000, 90 "default": 100 91 }, 92 "include_comments": { 93 "type": UserInput.OPTION_TOGGLE, 94 "help": "Include post comments", 95 "default": False, 96 "tooltip": "" 97 }, 98 "divider-2": { 99 "type": UserInput.OPTION_DIVIDER 100 }, 101 "daterange-info": { 102 "type": UserInput.OPTION_INFO, 103 "help": "VK daterange defaults vary by type of query. For the News Feed, posts are returned starting " 104 "with the most recent and working backwards." 105 }, 106 "daterange": { 107 "type": UserInput.OPTION_DATERANGE, 108 "help": "Date range" 109 }, 110 } 111 112 def get_items(self, query): 113 """ 114 Use the VK API 115 116 :param query: 117 :return: 118 """ 119 if self.parameters.get("username") is None or self.parameters.get("password") is None: 120 self.dataset.update_status( 121 "VK query failed or was interrupted; please create new query in order to provide username and password again.", 122 is_final=True) 123 return [] 124 125 self.dataset.update_status("Logging in to VK") 126 try: 127 vk_session = self.login(self.parameters.get("username"), self.parameters.get("password")) 128 except vk_api.exceptions.AuthError as e: 129 self.log.warning(f"VK Auth Issues: {e}") 130 self.dataset.update_status(f"VK unable to authorize user: {e}", is_final=True) 131 return [] 132 133 query_type = self.parameters.get("query_type") 134 query = self.parameters.get("query") 135 include_comments = self.parameters.get("include_comments", False) 136 137 if query_type == "newsfeed": 138 query_parameters = {"query": query, 139 "max_amount": self.parameters.get("amount")} 140 141 # Add start and end dates if provided 142 if self.parameters.get("min_date"): 143 query_parameters['start_time'] = self.parameters.get("min_date") 144 if self.parameters.get("max_date"): 145 query_parameters['end_time'] = self.parameters.get("max_date") 146 147 vk_helper = vk_session.get_api() 148 149 # Collect Newsfeed results 150 num_results = 0 151 self.dataset.update_status("Submitting query...") 152 for i, result_batch in enumerate(self.search_newsfeed(vk_helper, **query_parameters)): 153 if self.interrupted: 154 raise ProcessorInterruptedException("Interrupted while fetching newsfeed data from the VK API") 155 156 self.dataset.update_status(f"Processing results batch {i+1}") 157 for result in result_batch: 158 result.update({'4cat_item_type': 'post'}) 159 yield result 160 num_results += 1 161 162 if include_comments: 163 for comment in self.collect_all_comments(vk_helper, owner_id=result.get("owner_id"), post_id=result.get("id")): 164 comment.update({'4cat_item_type': 'comment'}) 165 yield comment 166 167 self.dataset.update_status(f"Received {num_results} results of max {self.parameters.get('amount')} from the VK API") 168 self.dataset.update_progress(num_results / self.parameters.get('amount')) 169 170 def login(self, username, password): 171 """ 172 Login and authenticate user 173 """ 174 vk_session = vk_api.VkApi(username, 175 password, 176 config_filename=self.config.get("PATH_SESSIONS").joinpath(username+"-vk_config.json")) 177 vk_session.auth() 178 179 return vk_session 180 181 def search_newsfeed(self, vk_helper, query, max_amount, num_collected=0, start_time=None, end_time=None, **kwargs): 182 """ 183 Collects all newsfeed posts 184 185 :param Object vk_helper: Authorized vk_api.VkApi 186 :param str query: String representing the search query 187 :param int max_amount: Max number of posts to collect 188 :param int num_collected: Number of previously collected results 189 :param int start_time: Timestamp for earliest post 190 :param int end_time: Timestamp for latest post 191 :return generator: Yields groups of posts 192 """ 193 remaining = max_amount - num_collected 194 parameters = { 195 "q": query, 196 "extended": 1, 197 "count": remaining if remaining < 200 else 200, 198 "fields": self.expanded_profile_fields, 199 } 200 if start_time: 201 parameters["start_time"] = start_time 202 if end_time: 203 parameters["end_time"] = end_time 204 205 response = vk_helper.newsfeed.search(**parameters) 206 news_feed_results = response.get("items", []) 207 num_collected = num_collected + len(news_feed_results) 208 209 # Flesh out profiles and groups 210 author_profiles = self.expand_profile_fields({"profiles": response.get("profiles", []), "groups": response.get("groups", [])}) 211 [result.update({"author_profile": author_profiles.get(result.get("from_id"), {})}) for result in news_feed_results] 212 213 yield news_feed_results 214 215 # Collect additional results 216 if response.get("next_from") and num_collected < max_amount: 217 parameters.update({"start_from": response.get("next_from")}) 218 for additional_results in self.search_newsfeed(vk_helper, query, max_amount, num_collected=num_collected, **parameters): 219 yield additional_results 220 221 def collect_all_comments(self, vk_helper, owner_id, post_id): 222 """ 223 Collects all comments and replies to a VK post 224 225 :param Object vk_helper: Authorized vk_api.VkApi 226 :param int owner_id: Owner ID provided by post/comment/etc 227 :param int post_id: ID of post from which to collect comments 228 :return generator: Yields comments and replies 229 """ 230 # Collect top level comments from post 231 comments = self.get_comments(vk_helper, owner_id, post_id=post_id) 232 233 # Extract replies and collect more if needed 234 for comment in comments: 235 yield comment 236 237 reply_count = comment.get("thread", {}).get("count", 0) 238 replies = comment.get("thread", {}).get("items", []) 239 if reply_count > 10 and len(replies) == 10: 240 # Collect additional replies 241 replies += self.get_comments(vk_helper, owner_id, comment_id=comment.get("id"), last_collected_id=replies[-1].get("id"))[1:] 242 243 for reply in replies: 244 yield reply 245 if reply.get("thread"): 246 self.log.warning("VK Datasource issue with replies: additional depth needs to be handled; contact 4CAT devs") 247 # TODO: this will need modification if reply threads gain depth 248 249 def get_comments(self, vk_helper, owner_id, post_id=None, comment_id=None, last_collected_id=None, **kwargs): 250 """ 251 Collect comments from either a post or another comment (i.e., replies to another comment). Must provide either 252 post_id or comment_id, but not both. 253 254 More information can be found here: 255 https://vk.com/dev/wall.getComments 256 257 :param Object vk_helper: Authorized vk_api.VkApi 258 :param int owner_id: Owner ID provided by post/comment/etc 259 :param int post_id: ID of post from which to collect comments 260 :param int comment_id: ID of comment from which to collect comments 261 :param int last_collected_id: ID of the last comment to collected; used as start to continue collecting comments 262 :return list: List of comments 263 """ 264 if self.interrupted: 265 raise ProcessorInterruptedException("Interrupted while fetching comments from the VK API") 266 267 if post_id is None and comment_id is None: 268 raise ProcessorException("Must provide either post_id or comment_id to collect comments from VK") 269 270 parameters = { 271 "owner_id": owner_id, 272 "need_likes": 1, 273 "preview_length": 0, 274 "extended": 1, 275 "count": 100, 276 "thread_items_count": 10, 277 "fields": self.expanded_profile_fields, 278 } 279 if post_id: 280 parameters.update({"post_id": post_id}) 281 if comment_id: 282 parameters.update({"comment_id": comment_id}) 283 if last_collected_id: 284 parameters.update({"start_comment_id": last_collected_id}) 285 286 # Collect comments from VK 287 try: 288 response = vk_helper.wall.getComments(**parameters) 289 except vk_api.exceptions.ApiError as e: 290 self.dataset.log(f"Unable to collect comments for owner_id {owner_id} and {'post_id' if post_id is not None else 'comment_id'} {post_id if post_id is not None else comment_id}: {e}") 291 return [] 292 comments = response.get("items", []) 293 294 # Flesh out profiles and groups 295 author_profiles = self.expand_profile_fields({"profiles": response.get("profiles", []), "groups": response.get("groups", [])}) 296 [comment.update({"author_profile": author_profiles.get(comment.get("from_id"), {})}) for comment in comments] 297 # Also expand replies 298 [reply.update({"author_profile": author_profiles.get(reply.get("from_id"), {})}) for replies in [comment.get("thread", {}).get("items", []) for comment in comments if comment.get("thread")] for reply in replies] 299 300 # Check if there are potentially additional comments 301 if response.get("count") > 100 and len(comments) == 100: 302 # Update params with last collected comment 303 parameters.update({"start_comment_id": comments[-1].get("id")}) 304 # Collect additional comments from VK and remove first comment (which is duplicate) 305 comments += self.get_comments(vk_helper=vk_helper, **parameters)[1:] 306 307 return comments 308 309 @ staticmethod 310 def expand_profile_fields(dict_of_profile_types): 311 """ 312 Combine various VK profile and group author information for easy lookup. Add 4CAT_author_profile_type field to 313 differentiate source of data later. 314 """ 315 author_types = {} 316 for profile_type, profiles in dict_of_profile_types.items(): 317 for profile in profiles: 318 if "id" not in profile: 319 raise ProcessorException("Profile missing id field; VK data format incorrect/changed") 320 elif profile.get("id") in author_types: 321 raise ProcessorException("Profile id duplicated across profile types; unable to combine profiles") 322 profile.update({"4CAT_author_profile_type": profile_type}) 323 author_types[profile.get("id")] = profile 324 return author_types 325 326 @staticmethod 327 def validate_query(query, request, config): 328 """ 329 Validate input for a dataset query on the VK data source. 330 331 Will raise a QueryParametersException if invalid parameters are 332 encountered. Parameters are additionally sanitised. 333 334 :param dict query: Query parameters, from client-side. 335 :param request: Flask request 336 :param ConfigManager|None config: Configuration reader (context-aware) 337 :return dict: Safe query parameters 338 """ 339 # Please provide something... 340 if not query.get("query", None): 341 raise QueryParametersException("Please provide a query.") 342 343 # the dates need to make sense as a range to search within 344 # but, on VK, you can also specify before *or* after only 345 after, before = query.get("daterange") 346 if before and after and before < after: 347 raise QueryParametersException("Date range must start before it ends") 348 349 # TODO: test username and password? 350 351 # if we made it this far, the query can be executed 352 params = { 353 "query": query.get("query"), 354 "query_type": query.get("query_type"), 355 "amount": query.get("amount"), 356 "include_comments": query.get("include_comments"), 357 "min_date": after, 358 "max_date": before, 359 "username": query.get("username"), 360 "password": query.get("password"), 361 } 362 363 return params 364 365 @staticmethod 366 def map_item(item): 367 """ 368 Map a nested VK object to a flat dictionary 369 370 :param item: VK object as originally returned by the VK API 371 :return dict: Dictionary in the format expected by 4CAT 372 """ 373 vk_item_time = datetime.datetime.fromtimestamp(item.get('date')) 374 375 # Process attachments 376 photos = [] 377 videos = [] 378 audio = [] 379 links = [] 380 docs = [] 381 for attachment in item.get("attachments", []): 382 attachment_type = attachment.get("type") 383 attachment = attachment.get(attachment_type) 384 if attachment_type == "photo": 385 if attachment.get("sizes"): 386 photos.append(sorted(attachment.get("sizes"), key=lambda d: d['width'], reverse=True)[0].get('url')) 387 else: 388 photos.append(str(attachment)) 389 elif attachment_type == "video": 390 # TODO: can I get the actual URL? Does not seem like it... 391 videos.append(f"https://vk.com/video{attachment.get('owner_id')}_{attachment.get('id')}") 392 elif attachment_type == "audio": 393 # TODO: Seem unable to create the URL with provided information... 394 audio.append(f"{attachment.get('artist')} - {attachment.get('title')}") 395 elif attachment_type == "link": 396 links.append(attachment.get('url', str(attachment))) 397 elif attachment_type == "doc": 398 docs.append(attachment.get('url', str(attachment))) 399 400 # Use 4cat_item_type to populate different fields 401 tread_id = "" 402 in_reply_to_user = "" 403 in_reply_to_comment_id = "" 404 if item.get("4cat_item_type") == "post": 405 tread_id = item.get("id") 406 elif item.get("4cat_item_type") == "comment": 407 tread_id = item.get("post_id") 408 in_reply_to_user = item.get("reply_to_user") 409 in_reply_to_comment_id = item.get("reply_to_comment") 410 411 author_profile = item.get("author_profile", {}) 412 profile_source = "user" if author_profile.get("4CAT_author_profile_type") == "profile" else "community" if author_profile.get("4CAT_author_profile_type") == "group" else "N/A" 413 # Use source of author profile if "type" not present (e.g., in users profiles do not seem to have type) 414 author_type = author_profile.get("type", profile_source) 415 416 return MappedItem({ 417 "id": item.get("id"), 418 "thread_id": tread_id, 419 "timestamp": vk_item_time.strftime("%Y-%m-%d %H:%M:%S"), 420 "unix_timestamp": int(vk_item_time.timestamp()), 421 "link": f"https://vk.com/wall{item.get('owner_id')}_{item.get('id')}", 422 "item_type": item.get("4cat_item_type"), 423 "body": item.get("text"), 424 "author_id": item.get("from_id"), 425 "author_type": author_type, 426 "author_screen_name": author_profile.get("screen_name"), 427 "author_name": author_profile.get("name", " ".join([author_profile.get("first_name", ""), author_profile.get("last_name", "")])), 428 "author_sex": "F" if author_profile.get("sex") == 1 else "M" if author_profile.get("sex") == 2 else "Not Specified" if author_profile.get("sex") == 0 else author_profile.get("sex", "N/A"), 429 "author_city": author_profile.get("city", {}).get("title", ""), 430 "author_country": author_profile.get("country", {}).get("title", ""), 431 "author_photo": author_profile.get("photo_200", 432 author_profile.get("photo_100", author_profile.get("photo_50", ""))), 433 "author_is_admin": True if author_profile.get("is_admin") == 1 else False if author_profile.get("is_admin") == 0 else author_profile.get("is_admin", "N/A"), 434 "author_is_advertiser": True if author_profile.get("is_advertiser") == 1 else False if author_profile.get( 435 "is_advertiser") == 0 else author_profile.get("is_advertiser", "N/A"), 436 "author_deactivated": author_profile.get("is_deactivated", False), 437 "author_privacy_is_closed": 'closed' if author_profile.get("is_closed") == 1 else 'open' if author_profile.get("is_closed") == 0 else 'private' if author_profile.get("is_closed") == 2 else author_profile.get("is_closed", "N/A"), 438 "author_followers": author_profile.get("followers_count", author_profile.get("members_count", "N/A")), 439 "in_reply_to_user": in_reply_to_user, 440 "in_reply_to_comment_id": in_reply_to_comment_id, 441 "source": item.get("post_source", {}).get("type"), 442 "views": item.get("views", {}).get("count"), 443 "likes": item.get("likes", {}).get("count"), 444 "post_comments": item.get("comments", {}).get("count"), 445 "edited": datetime.datetime.fromtimestamp(item.get("edited")).strftime("%Y-%m-%d %H:%M:%S") if item.get("edited", False) else False, 446 "photos": ", ".join(photos), 447 "videos": ", ".join(videos), 448 "audio": ", ".join(audio), 449 "links": ", ".join(links), 450 "docs": ", ".join(docs), 451 "subject": "", 452 })
15class SearchVK(Search): 16 """ 17 Get posts via the VK API 18 """ 19 type = "vk-search" # job ID 20 title = "VK" 21 extension = "ndjson" 22 is_local = False # Whether this datasource is locally scraped 23 is_static = False # Whether this datasource is still updated 24 25 previous_request = 0 26 import_issues = True 27 28 references = [ 29 "[VK API documentation](https://vk.com/dev/first_guide)", 30 "[Python API wrapper](https://github.com/python273/vk_api)" 31 ] 32 33 expanded_profile_fields = "id,screen_name,first_name,last_name,name,deactivated,is_closed,is_admin,sex,city,country,photo_200,photo_100,photo_50,followers_count,members_count" # https://vk.com/dev/objects/user & https://vk.com/dev/objects/group 34 35 @classmethod 36 def get_options(cls, parent_dataset=None, config=None): 37 """ 38 Get VK data source options 39 40 :param config: 41 :param parent_dataset: Should always be None 42 :return dict: Data source options 43 """ 44 45 intro_text = ("This data source uses VK's [API](https://vk.com/dev/first_guide) and a python " 46 "[wrapper](https://github.com/python273/vk_api) to request information from VK using your " 47 "username and password.") 48 49 return { 50 "intro-1": { 51 "type": UserInput.OPTION_INFO, 52 "help": intro_text 53 }, 54 "query_type": { 55 "type": UserInput.OPTION_CHOICE, 56 "help": "Query Type", 57 "options": { 58 "newsfeed": "News Feed search", 59 }, 60 "default": "newsfeed" 61 }, 62 "intro-2": { 63 "type": UserInput.OPTION_INFO, 64 "help": "Your username and password will be deleted after your query is complete." 65 }, 66 "username": { 67 "type": UserInput.OPTION_TEXT, 68 "sensitive": True, 69 "cache": True, 70 "help": "VK Username" 71 }, 72 "password": { 73 "type": UserInput.OPTION_TEXT, 74 "sensitive": True, 75 "cache": True, 76 "help": "VK Password" 77 }, 78 "intro-3": { 79 "type": UserInput.OPTION_INFO, 80 "help": "Enter the text to search for below." 81 }, 82 "query": { 83 "type": UserInput.OPTION_TEXT_LARGE, 84 "help": "Query" 85 }, 86 "amount": { 87 "type": UserInput.OPTION_TEXT, 88 "help": "Max items to retrieve", 89 "min": 0, 90 "max": 1000, 91 "default": 100 92 }, 93 "include_comments": { 94 "type": UserInput.OPTION_TOGGLE, 95 "help": "Include post comments", 96 "default": False, 97 "tooltip": "" 98 }, 99 "divider-2": { 100 "type": UserInput.OPTION_DIVIDER 101 }, 102 "daterange-info": { 103 "type": UserInput.OPTION_INFO, 104 "help": "VK daterange defaults vary by type of query. For the News Feed, posts are returned starting " 105 "with the most recent and working backwards." 106 }, 107 "daterange": { 108 "type": UserInput.OPTION_DATERANGE, 109 "help": "Date range" 110 }, 111 } 112 113 def get_items(self, query): 114 """ 115 Use the VK API 116 117 :param query: 118 :return: 119 """ 120 if self.parameters.get("username") is None or self.parameters.get("password") is None: 121 self.dataset.update_status( 122 "VK query failed or was interrupted; please create new query in order to provide username and password again.", 123 is_final=True) 124 return [] 125 126 self.dataset.update_status("Logging in to VK") 127 try: 128 vk_session = self.login(self.parameters.get("username"), self.parameters.get("password")) 129 except vk_api.exceptions.AuthError as e: 130 self.log.warning(f"VK Auth Issues: {e}") 131 self.dataset.update_status(f"VK unable to authorize user: {e}", is_final=True) 132 return [] 133 134 query_type = self.parameters.get("query_type") 135 query = self.parameters.get("query") 136 include_comments = self.parameters.get("include_comments", False) 137 138 if query_type == "newsfeed": 139 query_parameters = {"query": query, 140 "max_amount": self.parameters.get("amount")} 141 142 # Add start and end dates if provided 143 if self.parameters.get("min_date"): 144 query_parameters['start_time'] = self.parameters.get("min_date") 145 if self.parameters.get("max_date"): 146 query_parameters['end_time'] = self.parameters.get("max_date") 147 148 vk_helper = vk_session.get_api() 149 150 # Collect Newsfeed results 151 num_results = 0 152 self.dataset.update_status("Submitting query...") 153 for i, result_batch in enumerate(self.search_newsfeed(vk_helper, **query_parameters)): 154 if self.interrupted: 155 raise ProcessorInterruptedException("Interrupted while fetching newsfeed data from the VK API") 156 157 self.dataset.update_status(f"Processing results batch {i+1}") 158 for result in result_batch: 159 result.update({'4cat_item_type': 'post'}) 160 yield result 161 num_results += 1 162 163 if include_comments: 164 for comment in self.collect_all_comments(vk_helper, owner_id=result.get("owner_id"), post_id=result.get("id")): 165 comment.update({'4cat_item_type': 'comment'}) 166 yield comment 167 168 self.dataset.update_status(f"Received {num_results} results of max {self.parameters.get('amount')} from the VK API") 169 self.dataset.update_progress(num_results / self.parameters.get('amount')) 170 171 def login(self, username, password): 172 """ 173 Login and authenticate user 174 """ 175 vk_session = vk_api.VkApi(username, 176 password, 177 config_filename=self.config.get("PATH_SESSIONS").joinpath(username+"-vk_config.json")) 178 vk_session.auth() 179 180 return vk_session 181 182 def search_newsfeed(self, vk_helper, query, max_amount, num_collected=0, start_time=None, end_time=None, **kwargs): 183 """ 184 Collects all newsfeed posts 185 186 :param Object vk_helper: Authorized vk_api.VkApi 187 :param str query: String representing the search query 188 :param int max_amount: Max number of posts to collect 189 :param int num_collected: Number of previously collected results 190 :param int start_time: Timestamp for earliest post 191 :param int end_time: Timestamp for latest post 192 :return generator: Yields groups of posts 193 """ 194 remaining = max_amount - num_collected 195 parameters = { 196 "q": query, 197 "extended": 1, 198 "count": remaining if remaining < 200 else 200, 199 "fields": self.expanded_profile_fields, 200 } 201 if start_time: 202 parameters["start_time"] = start_time 203 if end_time: 204 parameters["end_time"] = end_time 205 206 response = vk_helper.newsfeed.search(**parameters) 207 news_feed_results = response.get("items", []) 208 num_collected = num_collected + len(news_feed_results) 209 210 # Flesh out profiles and groups 211 author_profiles = self.expand_profile_fields({"profiles": response.get("profiles", []), "groups": response.get("groups", [])}) 212 [result.update({"author_profile": author_profiles.get(result.get("from_id"), {})}) for result in news_feed_results] 213 214 yield news_feed_results 215 216 # Collect additional results 217 if response.get("next_from") and num_collected < max_amount: 218 parameters.update({"start_from": response.get("next_from")}) 219 for additional_results in self.search_newsfeed(vk_helper, query, max_amount, num_collected=num_collected, **parameters): 220 yield additional_results 221 222 def collect_all_comments(self, vk_helper, owner_id, post_id): 223 """ 224 Collects all comments and replies to a VK post 225 226 :param Object vk_helper: Authorized vk_api.VkApi 227 :param int owner_id: Owner ID provided by post/comment/etc 228 :param int post_id: ID of post from which to collect comments 229 :return generator: Yields comments and replies 230 """ 231 # Collect top level comments from post 232 comments = self.get_comments(vk_helper, owner_id, post_id=post_id) 233 234 # Extract replies and collect more if needed 235 for comment in comments: 236 yield comment 237 238 reply_count = comment.get("thread", {}).get("count", 0) 239 replies = comment.get("thread", {}).get("items", []) 240 if reply_count > 10 and len(replies) == 10: 241 # Collect additional replies 242 replies += self.get_comments(vk_helper, owner_id, comment_id=comment.get("id"), last_collected_id=replies[-1].get("id"))[1:] 243 244 for reply in replies: 245 yield reply 246 if reply.get("thread"): 247 self.log.warning("VK Datasource issue with replies: additional depth needs to be handled; contact 4CAT devs") 248 # TODO: this will need modification if reply threads gain depth 249 250 def get_comments(self, vk_helper, owner_id, post_id=None, comment_id=None, last_collected_id=None, **kwargs): 251 """ 252 Collect comments from either a post or another comment (i.e., replies to another comment). Must provide either 253 post_id or comment_id, but not both. 254 255 More information can be found here: 256 https://vk.com/dev/wall.getComments 257 258 :param Object vk_helper: Authorized vk_api.VkApi 259 :param int owner_id: Owner ID provided by post/comment/etc 260 :param int post_id: ID of post from which to collect comments 261 :param int comment_id: ID of comment from which to collect comments 262 :param int last_collected_id: ID of the last comment to collected; used as start to continue collecting comments 263 :return list: List of comments 264 """ 265 if self.interrupted: 266 raise ProcessorInterruptedException("Interrupted while fetching comments from the VK API") 267 268 if post_id is None and comment_id is None: 269 raise ProcessorException("Must provide either post_id or comment_id to collect comments from VK") 270 271 parameters = { 272 "owner_id": owner_id, 273 "need_likes": 1, 274 "preview_length": 0, 275 "extended": 1, 276 "count": 100, 277 "thread_items_count": 10, 278 "fields": self.expanded_profile_fields, 279 } 280 if post_id: 281 parameters.update({"post_id": post_id}) 282 if comment_id: 283 parameters.update({"comment_id": comment_id}) 284 if last_collected_id: 285 parameters.update({"start_comment_id": last_collected_id}) 286 287 # Collect comments from VK 288 try: 289 response = vk_helper.wall.getComments(**parameters) 290 except vk_api.exceptions.ApiError as e: 291 self.dataset.log(f"Unable to collect comments for owner_id {owner_id} and {'post_id' if post_id is not None else 'comment_id'} {post_id if post_id is not None else comment_id}: {e}") 292 return [] 293 comments = response.get("items", []) 294 295 # Flesh out profiles and groups 296 author_profiles = self.expand_profile_fields({"profiles": response.get("profiles", []), "groups": response.get("groups", [])}) 297 [comment.update({"author_profile": author_profiles.get(comment.get("from_id"), {})}) for comment in comments] 298 # Also expand replies 299 [reply.update({"author_profile": author_profiles.get(reply.get("from_id"), {})}) for replies in [comment.get("thread", {}).get("items", []) for comment in comments if comment.get("thread")] for reply in replies] 300 301 # Check if there are potentially additional comments 302 if response.get("count") > 100 and len(comments) == 100: 303 # Update params with last collected comment 304 parameters.update({"start_comment_id": comments[-1].get("id")}) 305 # Collect additional comments from VK and remove first comment (which is duplicate) 306 comments += self.get_comments(vk_helper=vk_helper, **parameters)[1:] 307 308 return comments 309 310 @ staticmethod 311 def expand_profile_fields(dict_of_profile_types): 312 """ 313 Combine various VK profile and group author information for easy lookup. Add 4CAT_author_profile_type field to 314 differentiate source of data later. 315 """ 316 author_types = {} 317 for profile_type, profiles in dict_of_profile_types.items(): 318 for profile in profiles: 319 if "id" not in profile: 320 raise ProcessorException("Profile missing id field; VK data format incorrect/changed") 321 elif profile.get("id") in author_types: 322 raise ProcessorException("Profile id duplicated across profile types; unable to combine profiles") 323 profile.update({"4CAT_author_profile_type": profile_type}) 324 author_types[profile.get("id")] = profile 325 return author_types 326 327 @staticmethod 328 def validate_query(query, request, config): 329 """ 330 Validate input for a dataset query on the VK data source. 331 332 Will raise a QueryParametersException if invalid parameters are 333 encountered. Parameters are additionally sanitised. 334 335 :param dict query: Query parameters, from client-side. 336 :param request: Flask request 337 :param ConfigManager|None config: Configuration reader (context-aware) 338 :return dict: Safe query parameters 339 """ 340 # Please provide something... 341 if not query.get("query", None): 342 raise QueryParametersException("Please provide a query.") 343 344 # the dates need to make sense as a range to search within 345 # but, on VK, you can also specify before *or* after only 346 after, before = query.get("daterange") 347 if before and after and before < after: 348 raise QueryParametersException("Date range must start before it ends") 349 350 # TODO: test username and password? 351 352 # if we made it this far, the query can be executed 353 params = { 354 "query": query.get("query"), 355 "query_type": query.get("query_type"), 356 "amount": query.get("amount"), 357 "include_comments": query.get("include_comments"), 358 "min_date": after, 359 "max_date": before, 360 "username": query.get("username"), 361 "password": query.get("password"), 362 } 363 364 return params 365 366 @staticmethod 367 def map_item(item): 368 """ 369 Map a nested VK object to a flat dictionary 370 371 :param item: VK object as originally returned by the VK API 372 :return dict: Dictionary in the format expected by 4CAT 373 """ 374 vk_item_time = datetime.datetime.fromtimestamp(item.get('date')) 375 376 # Process attachments 377 photos = [] 378 videos = [] 379 audio = [] 380 links = [] 381 docs = [] 382 for attachment in item.get("attachments", []): 383 attachment_type = attachment.get("type") 384 attachment = attachment.get(attachment_type) 385 if attachment_type == "photo": 386 if attachment.get("sizes"): 387 photos.append(sorted(attachment.get("sizes"), key=lambda d: d['width'], reverse=True)[0].get('url')) 388 else: 389 photos.append(str(attachment)) 390 elif attachment_type == "video": 391 # TODO: can I get the actual URL? Does not seem like it... 392 videos.append(f"https://vk.com/video{attachment.get('owner_id')}_{attachment.get('id')}") 393 elif attachment_type == "audio": 394 # TODO: Seem unable to create the URL with provided information... 395 audio.append(f"{attachment.get('artist')} - {attachment.get('title')}") 396 elif attachment_type == "link": 397 links.append(attachment.get('url', str(attachment))) 398 elif attachment_type == "doc": 399 docs.append(attachment.get('url', str(attachment))) 400 401 # Use 4cat_item_type to populate different fields 402 tread_id = "" 403 in_reply_to_user = "" 404 in_reply_to_comment_id = "" 405 if item.get("4cat_item_type") == "post": 406 tread_id = item.get("id") 407 elif item.get("4cat_item_type") == "comment": 408 tread_id = item.get("post_id") 409 in_reply_to_user = item.get("reply_to_user") 410 in_reply_to_comment_id = item.get("reply_to_comment") 411 412 author_profile = item.get("author_profile", {}) 413 profile_source = "user" if author_profile.get("4CAT_author_profile_type") == "profile" else "community" if author_profile.get("4CAT_author_profile_type") == "group" else "N/A" 414 # Use source of author profile if "type" not present (e.g., in users profiles do not seem to have type) 415 author_type = author_profile.get("type", profile_source) 416 417 return MappedItem({ 418 "id": item.get("id"), 419 "thread_id": tread_id, 420 "timestamp": vk_item_time.strftime("%Y-%m-%d %H:%M:%S"), 421 "unix_timestamp": int(vk_item_time.timestamp()), 422 "link": f"https://vk.com/wall{item.get('owner_id')}_{item.get('id')}", 423 "item_type": item.get("4cat_item_type"), 424 "body": item.get("text"), 425 "author_id": item.get("from_id"), 426 "author_type": author_type, 427 "author_screen_name": author_profile.get("screen_name"), 428 "author_name": author_profile.get("name", " ".join([author_profile.get("first_name", ""), author_profile.get("last_name", "")])), 429 "author_sex": "F" if author_profile.get("sex") == 1 else "M" if author_profile.get("sex") == 2 else "Not Specified" if author_profile.get("sex") == 0 else author_profile.get("sex", "N/A"), 430 "author_city": author_profile.get("city", {}).get("title", ""), 431 "author_country": author_profile.get("country", {}).get("title", ""), 432 "author_photo": author_profile.get("photo_200", 433 author_profile.get("photo_100", author_profile.get("photo_50", ""))), 434 "author_is_admin": True if author_profile.get("is_admin") == 1 else False if author_profile.get("is_admin") == 0 else author_profile.get("is_admin", "N/A"), 435 "author_is_advertiser": True if author_profile.get("is_advertiser") == 1 else False if author_profile.get( 436 "is_advertiser") == 0 else author_profile.get("is_advertiser", "N/A"), 437 "author_deactivated": author_profile.get("is_deactivated", False), 438 "author_privacy_is_closed": 'closed' if author_profile.get("is_closed") == 1 else 'open' if author_profile.get("is_closed") == 0 else 'private' if author_profile.get("is_closed") == 2 else author_profile.get("is_closed", "N/A"), 439 "author_followers": author_profile.get("followers_count", author_profile.get("members_count", "N/A")), 440 "in_reply_to_user": in_reply_to_user, 441 "in_reply_to_comment_id": in_reply_to_comment_id, 442 "source": item.get("post_source", {}).get("type"), 443 "views": item.get("views", {}).get("count"), 444 "likes": item.get("likes", {}).get("count"), 445 "post_comments": item.get("comments", {}).get("count"), 446 "edited": datetime.datetime.fromtimestamp(item.get("edited")).strftime("%Y-%m-%d %H:%M:%S") if item.get("edited", False) else False, 447 "photos": ", ".join(photos), 448 "videos": ", ".join(videos), 449 "audio": ", ".join(audio), 450 "links": ", ".join(links), 451 "docs": ", ".join(docs), 452 "subject": "", 453 })
Get posts via the VK API
35 @classmethod 36 def get_options(cls, parent_dataset=None, config=None): 37 """ 38 Get VK data source options 39 40 :param config: 41 :param parent_dataset: Should always be None 42 :return dict: Data source options 43 """ 44 45 intro_text = ("This data source uses VK's [API](https://vk.com/dev/first_guide) and a python " 46 "[wrapper](https://github.com/python273/vk_api) to request information from VK using your " 47 "username and password.") 48 49 return { 50 "intro-1": { 51 "type": UserInput.OPTION_INFO, 52 "help": intro_text 53 }, 54 "query_type": { 55 "type": UserInput.OPTION_CHOICE, 56 "help": "Query Type", 57 "options": { 58 "newsfeed": "News Feed search", 59 }, 60 "default": "newsfeed" 61 }, 62 "intro-2": { 63 "type": UserInput.OPTION_INFO, 64 "help": "Your username and password will be deleted after your query is complete." 65 }, 66 "username": { 67 "type": UserInput.OPTION_TEXT, 68 "sensitive": True, 69 "cache": True, 70 "help": "VK Username" 71 }, 72 "password": { 73 "type": UserInput.OPTION_TEXT, 74 "sensitive": True, 75 "cache": True, 76 "help": "VK Password" 77 }, 78 "intro-3": { 79 "type": UserInput.OPTION_INFO, 80 "help": "Enter the text to search for below." 81 }, 82 "query": { 83 "type": UserInput.OPTION_TEXT_LARGE, 84 "help": "Query" 85 }, 86 "amount": { 87 "type": UserInput.OPTION_TEXT, 88 "help": "Max items to retrieve", 89 "min": 0, 90 "max": 1000, 91 "default": 100 92 }, 93 "include_comments": { 94 "type": UserInput.OPTION_TOGGLE, 95 "help": "Include post comments", 96 "default": False, 97 "tooltip": "" 98 }, 99 "divider-2": { 100 "type": UserInput.OPTION_DIVIDER 101 }, 102 "daterange-info": { 103 "type": UserInput.OPTION_INFO, 104 "help": "VK daterange defaults vary by type of query. For the News Feed, posts are returned starting " 105 "with the most recent and working backwards." 106 }, 107 "daterange": { 108 "type": UserInput.OPTION_DATERANGE, 109 "help": "Date range" 110 }, 111 }
Get VK data source options
Parameters
- config:
- parent_dataset: Should always be None
Returns
Data source options
113 def get_items(self, query): 114 """ 115 Use the VK API 116 117 :param query: 118 :return: 119 """ 120 if self.parameters.get("username") is None or self.parameters.get("password") is None: 121 self.dataset.update_status( 122 "VK query failed or was interrupted; please create new query in order to provide username and password again.", 123 is_final=True) 124 return [] 125 126 self.dataset.update_status("Logging in to VK") 127 try: 128 vk_session = self.login(self.parameters.get("username"), self.parameters.get("password")) 129 except vk_api.exceptions.AuthError as e: 130 self.log.warning(f"VK Auth Issues: {e}") 131 self.dataset.update_status(f"VK unable to authorize user: {e}", is_final=True) 132 return [] 133 134 query_type = self.parameters.get("query_type") 135 query = self.parameters.get("query") 136 include_comments = self.parameters.get("include_comments", False) 137 138 if query_type == "newsfeed": 139 query_parameters = {"query": query, 140 "max_amount": self.parameters.get("amount")} 141 142 # Add start and end dates if provided 143 if self.parameters.get("min_date"): 144 query_parameters['start_time'] = self.parameters.get("min_date") 145 if self.parameters.get("max_date"): 146 query_parameters['end_time'] = self.parameters.get("max_date") 147 148 vk_helper = vk_session.get_api() 149 150 # Collect Newsfeed results 151 num_results = 0 152 self.dataset.update_status("Submitting query...") 153 for i, result_batch in enumerate(self.search_newsfeed(vk_helper, **query_parameters)): 154 if self.interrupted: 155 raise ProcessorInterruptedException("Interrupted while fetching newsfeed data from the VK API") 156 157 self.dataset.update_status(f"Processing results batch {i+1}") 158 for result in result_batch: 159 result.update({'4cat_item_type': 'post'}) 160 yield result 161 num_results += 1 162 163 if include_comments: 164 for comment in self.collect_all_comments(vk_helper, owner_id=result.get("owner_id"), post_id=result.get("id")): 165 comment.update({'4cat_item_type': 'comment'}) 166 yield comment 167 168 self.dataset.update_status(f"Received {num_results} results of max {self.parameters.get('amount')} from the VK API") 169 self.dataset.update_progress(num_results / self.parameters.get('amount'))
Use the VK API
Parameters
- query:
Returns
171 def login(self, username, password): 172 """ 173 Login and authenticate user 174 """ 175 vk_session = vk_api.VkApi(username, 176 password, 177 config_filename=self.config.get("PATH_SESSIONS").joinpath(username+"-vk_config.json")) 178 vk_session.auth() 179 180 return vk_session
Login and authenticate user
182 def search_newsfeed(self, vk_helper, query, max_amount, num_collected=0, start_time=None, end_time=None, **kwargs): 183 """ 184 Collects all newsfeed posts 185 186 :param Object vk_helper: Authorized vk_api.VkApi 187 :param str query: String representing the search query 188 :param int max_amount: Max number of posts to collect 189 :param int num_collected: Number of previously collected results 190 :param int start_time: Timestamp for earliest post 191 :param int end_time: Timestamp for latest post 192 :return generator: Yields groups of posts 193 """ 194 remaining = max_amount - num_collected 195 parameters = { 196 "q": query, 197 "extended": 1, 198 "count": remaining if remaining < 200 else 200, 199 "fields": self.expanded_profile_fields, 200 } 201 if start_time: 202 parameters["start_time"] = start_time 203 if end_time: 204 parameters["end_time"] = end_time 205 206 response = vk_helper.newsfeed.search(**parameters) 207 news_feed_results = response.get("items", []) 208 num_collected = num_collected + len(news_feed_results) 209 210 # Flesh out profiles and groups 211 author_profiles = self.expand_profile_fields({"profiles": response.get("profiles", []), "groups": response.get("groups", [])}) 212 [result.update({"author_profile": author_profiles.get(result.get("from_id"), {})}) for result in news_feed_results] 213 214 yield news_feed_results 215 216 # Collect additional results 217 if response.get("next_from") and num_collected < max_amount: 218 parameters.update({"start_from": response.get("next_from")}) 219 for additional_results in self.search_newsfeed(vk_helper, query, max_amount, num_collected=num_collected, **parameters): 220 yield additional_results
Collects all newsfeed posts
Parameters
- Object vk_helper: Authorized vk_api.VkApi
- str query: String representing the search query
- int max_amount: Max number of posts to collect
- int num_collected: Number of previously collected results
- int start_time: Timestamp for earliest post
- int end_time: Timestamp for latest post
Returns
Yields groups of posts
222 def collect_all_comments(self, vk_helper, owner_id, post_id): 223 """ 224 Collects all comments and replies to a VK post 225 226 :param Object vk_helper: Authorized vk_api.VkApi 227 :param int owner_id: Owner ID provided by post/comment/etc 228 :param int post_id: ID of post from which to collect comments 229 :return generator: Yields comments and replies 230 """ 231 # Collect top level comments from post 232 comments = self.get_comments(vk_helper, owner_id, post_id=post_id) 233 234 # Extract replies and collect more if needed 235 for comment in comments: 236 yield comment 237 238 reply_count = comment.get("thread", {}).get("count", 0) 239 replies = comment.get("thread", {}).get("items", []) 240 if reply_count > 10 and len(replies) == 10: 241 # Collect additional replies 242 replies += self.get_comments(vk_helper, owner_id, comment_id=comment.get("id"), last_collected_id=replies[-1].get("id"))[1:] 243 244 for reply in replies: 245 yield reply 246 if reply.get("thread"): 247 self.log.warning("VK Datasource issue with replies: additional depth needs to be handled; contact 4CAT devs") 248 # TODO: this will need modification if reply threads gain depth
Collects all comments and replies to a VK post
Parameters
- Object vk_helper: Authorized vk_api.VkApi
- int owner_id: Owner ID provided by post/comment/etc
- int post_id: ID of post from which to collect comments
Returns
Yields comments and replies
250 def get_comments(self, vk_helper, owner_id, post_id=None, comment_id=None, last_collected_id=None, **kwargs): 251 """ 252 Collect comments from either a post or another comment (i.e., replies to another comment). Must provide either 253 post_id or comment_id, but not both. 254 255 More information can be found here: 256 https://vk.com/dev/wall.getComments 257 258 :param Object vk_helper: Authorized vk_api.VkApi 259 :param int owner_id: Owner ID provided by post/comment/etc 260 :param int post_id: ID of post from which to collect comments 261 :param int comment_id: ID of comment from which to collect comments 262 :param int last_collected_id: ID of the last comment to collected; used as start to continue collecting comments 263 :return list: List of comments 264 """ 265 if self.interrupted: 266 raise ProcessorInterruptedException("Interrupted while fetching comments from the VK API") 267 268 if post_id is None and comment_id is None: 269 raise ProcessorException("Must provide either post_id or comment_id to collect comments from VK") 270 271 parameters = { 272 "owner_id": owner_id, 273 "need_likes": 1, 274 "preview_length": 0, 275 "extended": 1, 276 "count": 100, 277 "thread_items_count": 10, 278 "fields": self.expanded_profile_fields, 279 } 280 if post_id: 281 parameters.update({"post_id": post_id}) 282 if comment_id: 283 parameters.update({"comment_id": comment_id}) 284 if last_collected_id: 285 parameters.update({"start_comment_id": last_collected_id}) 286 287 # Collect comments from VK 288 try: 289 response = vk_helper.wall.getComments(**parameters) 290 except vk_api.exceptions.ApiError as e: 291 self.dataset.log(f"Unable to collect comments for owner_id {owner_id} and {'post_id' if post_id is not None else 'comment_id'} {post_id if post_id is not None else comment_id}: {e}") 292 return [] 293 comments = response.get("items", []) 294 295 # Flesh out profiles and groups 296 author_profiles = self.expand_profile_fields({"profiles": response.get("profiles", []), "groups": response.get("groups", [])}) 297 [comment.update({"author_profile": author_profiles.get(comment.get("from_id"), {})}) for comment in comments] 298 # Also expand replies 299 [reply.update({"author_profile": author_profiles.get(reply.get("from_id"), {})}) for replies in [comment.get("thread", {}).get("items", []) for comment in comments if comment.get("thread")] for reply in replies] 300 301 # Check if there are potentially additional comments 302 if response.get("count") > 100 and len(comments) == 100: 303 # Update params with last collected comment 304 parameters.update({"start_comment_id": comments[-1].get("id")}) 305 # Collect additional comments from VK and remove first comment (which is duplicate) 306 comments += self.get_comments(vk_helper=vk_helper, **parameters)[1:] 307 308 return comments
Collect comments from either a post or another comment (i.e., replies to another comment). Must provide either post_id or comment_id, but not both.
More information can be found here: https://vk.com/dev/wall.getComments
Parameters
- Object vk_helper: Authorized vk_api.VkApi
- int owner_id: Owner ID provided by post/comment/etc
- int post_id: ID of post from which to collect comments
- int comment_id: ID of comment from which to collect comments
- int last_collected_id: ID of the last comment to collected; used as start to continue collecting comments
Returns
List of comments
310 @ staticmethod 311 def expand_profile_fields(dict_of_profile_types): 312 """ 313 Combine various VK profile and group author information for easy lookup. Add 4CAT_author_profile_type field to 314 differentiate source of data later. 315 """ 316 author_types = {} 317 for profile_type, profiles in dict_of_profile_types.items(): 318 for profile in profiles: 319 if "id" not in profile: 320 raise ProcessorException("Profile missing id field; VK data format incorrect/changed") 321 elif profile.get("id") in author_types: 322 raise ProcessorException("Profile id duplicated across profile types; unable to combine profiles") 323 profile.update({"4CAT_author_profile_type": profile_type}) 324 author_types[profile.get("id")] = profile 325 return author_types
Combine various VK profile and group author information for easy lookup. Add 4CAT_author_profile_type field to differentiate source of data later.
327 @staticmethod 328 def validate_query(query, request, config): 329 """ 330 Validate input for a dataset query on the VK data source. 331 332 Will raise a QueryParametersException if invalid parameters are 333 encountered. Parameters are additionally sanitised. 334 335 :param dict query: Query parameters, from client-side. 336 :param request: Flask request 337 :param ConfigManager|None config: Configuration reader (context-aware) 338 :return dict: Safe query parameters 339 """ 340 # Please provide something... 341 if not query.get("query", None): 342 raise QueryParametersException("Please provide a query.") 343 344 # the dates need to make sense as a range to search within 345 # but, on VK, you can also specify before *or* after only 346 after, before = query.get("daterange") 347 if before and after and before < after: 348 raise QueryParametersException("Date range must start before it ends") 349 350 # TODO: test username and password? 351 352 # if we made it this far, the query can be executed 353 params = { 354 "query": query.get("query"), 355 "query_type": query.get("query_type"), 356 "amount": query.get("amount"), 357 "include_comments": query.get("include_comments"), 358 "min_date": after, 359 "max_date": before, 360 "username": query.get("username"), 361 "password": query.get("password"), 362 } 363 364 return params
Validate input for a dataset query on the VK data source.
Will raise a QueryParametersException if invalid parameters are encountered. Parameters are additionally sanitised.
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- ConfigManager|None config: Configuration reader (context-aware)
Returns
Safe query parameters
366 @staticmethod 367 def map_item(item): 368 """ 369 Map a nested VK object to a flat dictionary 370 371 :param item: VK object as originally returned by the VK API 372 :return dict: Dictionary in the format expected by 4CAT 373 """ 374 vk_item_time = datetime.datetime.fromtimestamp(item.get('date')) 375 376 # Process attachments 377 photos = [] 378 videos = [] 379 audio = [] 380 links = [] 381 docs = [] 382 for attachment in item.get("attachments", []): 383 attachment_type = attachment.get("type") 384 attachment = attachment.get(attachment_type) 385 if attachment_type == "photo": 386 if attachment.get("sizes"): 387 photos.append(sorted(attachment.get("sizes"), key=lambda d: d['width'], reverse=True)[0].get('url')) 388 else: 389 photos.append(str(attachment)) 390 elif attachment_type == "video": 391 # TODO: can I get the actual URL? Does not seem like it... 392 videos.append(f"https://vk.com/video{attachment.get('owner_id')}_{attachment.get('id')}") 393 elif attachment_type == "audio": 394 # TODO: Seem unable to create the URL with provided information... 395 audio.append(f"{attachment.get('artist')} - {attachment.get('title')}") 396 elif attachment_type == "link": 397 links.append(attachment.get('url', str(attachment))) 398 elif attachment_type == "doc": 399 docs.append(attachment.get('url', str(attachment))) 400 401 # Use 4cat_item_type to populate different fields 402 tread_id = "" 403 in_reply_to_user = "" 404 in_reply_to_comment_id = "" 405 if item.get("4cat_item_type") == "post": 406 tread_id = item.get("id") 407 elif item.get("4cat_item_type") == "comment": 408 tread_id = item.get("post_id") 409 in_reply_to_user = item.get("reply_to_user") 410 in_reply_to_comment_id = item.get("reply_to_comment") 411 412 author_profile = item.get("author_profile", {}) 413 profile_source = "user" if author_profile.get("4CAT_author_profile_type") == "profile" else "community" if author_profile.get("4CAT_author_profile_type") == "group" else "N/A" 414 # Use source of author profile if "type" not present (e.g., in users profiles do not seem to have type) 415 author_type = author_profile.get("type", profile_source) 416 417 return MappedItem({ 418 "id": item.get("id"), 419 "thread_id": tread_id, 420 "timestamp": vk_item_time.strftime("%Y-%m-%d %H:%M:%S"), 421 "unix_timestamp": int(vk_item_time.timestamp()), 422 "link": f"https://vk.com/wall{item.get('owner_id')}_{item.get('id')}", 423 "item_type": item.get("4cat_item_type"), 424 "body": item.get("text"), 425 "author_id": item.get("from_id"), 426 "author_type": author_type, 427 "author_screen_name": author_profile.get("screen_name"), 428 "author_name": author_profile.get("name", " ".join([author_profile.get("first_name", ""), author_profile.get("last_name", "")])), 429 "author_sex": "F" if author_profile.get("sex") == 1 else "M" if author_profile.get("sex") == 2 else "Not Specified" if author_profile.get("sex") == 0 else author_profile.get("sex", "N/A"), 430 "author_city": author_profile.get("city", {}).get("title", ""), 431 "author_country": author_profile.get("country", {}).get("title", ""), 432 "author_photo": author_profile.get("photo_200", 433 author_profile.get("photo_100", author_profile.get("photo_50", ""))), 434 "author_is_admin": True if author_profile.get("is_admin") == 1 else False if author_profile.get("is_admin") == 0 else author_profile.get("is_admin", "N/A"), 435 "author_is_advertiser": True if author_profile.get("is_advertiser") == 1 else False if author_profile.get( 436 "is_advertiser") == 0 else author_profile.get("is_advertiser", "N/A"), 437 "author_deactivated": author_profile.get("is_deactivated", False), 438 "author_privacy_is_closed": 'closed' if author_profile.get("is_closed") == 1 else 'open' if author_profile.get("is_closed") == 0 else 'private' if author_profile.get("is_closed") == 2 else author_profile.get("is_closed", "N/A"), 439 "author_followers": author_profile.get("followers_count", author_profile.get("members_count", "N/A")), 440 "in_reply_to_user": in_reply_to_user, 441 "in_reply_to_comment_id": in_reply_to_comment_id, 442 "source": item.get("post_source", {}).get("type"), 443 "views": item.get("views", {}).get("count"), 444 "likes": item.get("likes", {}).get("count"), 445 "post_comments": item.get("comments", {}).get("count"), 446 "edited": datetime.datetime.fromtimestamp(item.get("edited")).strftime("%Y-%m-%d %H:%M:%S") if item.get("edited", False) else False, 447 "photos": ", ".join(photos), 448 "videos": ", ".join(videos), 449 "audio": ", ".join(audio), 450 "links": ", ".join(links), 451 "docs": ", ".join(docs), 452 "subject": "", 453 })
Map a nested VK object to a flat dictionary
Parameters
- item: VK object as originally returned by the VK API
Returns
Dictionary in the format expected by 4CAT
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- run_interruptable_process
- get_queue_id
- is_4cat_class
- backend.lib.search.Search
- max_workers
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- search
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- description
- category
- config
- is_running_in_preset
- filepath
- for_cleanup
- work
- after_process
- clean_up_on_error
- abort
- iterate_proxied_requests
- push_proxied_request
- flush_proxied_requests
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- save_annotations
- map_item_method_available
- get_mapped_item
- is_filter
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor