datasources.vk.search_vk
VK keyword search
1""" 2VK keyword search 3""" 4import datetime 5from pathlib import Path 6 7import vk_api 8 9from backend.lib.search import Search 10from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, ProcessorException 11from common.lib.helpers import UserInput 12from common.lib.item_mapping import MappedItem 13 14 15class SearchVK(Search): 16 """ 17 Get posts via the VK API 18 """ 19 type = "vk-search" # job ID 20 title = "VK" 21 extension = "ndjson" 22 is_local = False # Whether this datasource is locally scraped 23 is_static = False # Whether this datasource is still updated 24 25 previous_request = 0 26 import_issues = True 27 28 references = [ 29 "[VK API documentation](https://vk.com/dev/first_guide)", 30 "[Python API wrapper](https://github.com/python273/vk_api)" 31 ] 32 33 expanded_profile_fields = "id,screen_name,first_name,last_name,name,deactivated,is_closed,is_admin,sex,city,country,photo_200,photo_100,photo_50,followers_count,members_count" # https://vk.com/dev/objects/user & https://vk.com/dev/objects/group 34 35 @classmethod 36 def get_options(cls, parent_dataset=None, config=None): 37 """ 38 Get VK data source options 39 40 :param config: 41 :param parent_dataset: Should always be None 42 :return dict: Data source options 43 """ 44 45 intro_text = ("This data source uses VK's [API](https://vk.com/dev/first_guide) and a python " 46 "[wrapper](https://github.com/python273/vk_api) to request information from VK using your " 47 "username and password.") 48 49 options = { 50 "intro-1": { 51 "type": UserInput.OPTION_INFO, 52 "help": intro_text 53 }, 54 "query_type": { 55 "type": UserInput.OPTION_CHOICE, 56 "help": "Query Type", 57 "options": { 58 "newsfeed": "News Feed search", 59 }, 60 "default": "newsfeed" 61 }, 62 "intro-2": { 63 "type": UserInput.OPTION_INFO, 64 "help": "Your username and password will be deleted after your query is complete." 65 }, 66 "username": { 67 "type": UserInput.OPTION_TEXT, 68 "sensitive": True, 69 "cache": True, 70 "help": "VK Username" 71 }, 72 "password": { 73 "type": UserInput.OPTION_TEXT, 74 "sensitive": True, 75 "cache": True, 76 "help": "VK Password" 77 }, 78 "intro-3": { 79 "type": UserInput.OPTION_INFO, 80 "help": "Enter the text to search for below." 81 }, 82 "query": { 83 "type": UserInput.OPTION_TEXT_LARGE, 84 "help": "Query" 85 }, 86 "amount": { 87 "type": UserInput.OPTION_TEXT, 88 "help": "Max items to retrieve", 89 "min": 0, 90 "max": 1000, 91 "default": 100 92 }, 93 "include_comments": { 94 "type": UserInput.OPTION_TOGGLE, 95 "help": "Include post comments", 96 "default": False, 97 "tooltip": "" 98 }, 99 "divider-2": { 100 "type": UserInput.OPTION_DIVIDER 101 }, 102 "daterange-info": { 103 "type": UserInput.OPTION_INFO, 104 "help": "VK daterange defaults vary by type of query. For the News Feed, posts are returned starting " 105 "with the most recent and working backwards." 106 }, 107 "daterange": { 108 "type": UserInput.OPTION_DATERANGE, 109 "help": "Date range" 110 }, 111 } 112 113 return options 114 115 def get_items(self, query): 116 """ 117 Use the VK API 118 119 :param query: 120 :return: 121 """ 122 if self.parameters.get("username") is None or self.parameters.get("password") is None: 123 self.dataset.update_status( 124 "VK query failed or was interrupted; please create new query in order to provide username and password again.", 125 is_final=True) 126 return [] 127 128 self.dataset.update_status("Logging in to VK") 129 try: 130 vk_session = self.login(self.parameters.get("username"), self.parameters.get("password")) 131 except vk_api.exceptions.AuthError as e: 132 self.log.warning(f"VK Auth Issues: {e}") 133 self.dataset.update_status(f"VK unable to authorize user: {e}", is_final=True) 134 return [] 135 136 query_type = self.parameters.get("query_type") 137 query = self.parameters.get("query") 138 include_comments = self.parameters.get("include_comments", False) 139 140 if query_type == "newsfeed": 141 query_parameters = {"query": query, 142 "max_amount": self.parameters.get("amount")} 143 144 # Add start and end dates if provided 145 if self.parameters.get("min_date"): 146 query_parameters['start_time'] = self.parameters.get("min_date") 147 if self.parameters.get("max_date"): 148 query_parameters['end_time'] = self.parameters.get("max_date") 149 150 vk_helper = vk_session.get_api() 151 152 # Collect Newsfeed results 153 num_results = 0 154 self.dataset.update_status("Submitting query...") 155 for i, result_batch in enumerate(self.search_newsfeed(vk_helper, **query_parameters)): 156 if self.interrupted: 157 raise ProcessorInterruptedException("Interrupted while fetching newsfeed data from the VK API") 158 159 self.dataset.update_status(f"Processing results batch {i+1}") 160 for result in result_batch: 161 result.update({'4cat_item_type': 'post'}) 162 yield result 163 num_results += 1 164 165 if include_comments: 166 for comment in self.collect_all_comments(vk_helper, owner_id=result.get("owner_id"), post_id=result.get("id")): 167 comment.update({'4cat_item_type': 'comment'}) 168 yield comment 169 170 self.dataset.update_status(f"Received {num_results} results of max {self.parameters.get('amount')} from the VK API") 171 self.dataset.update_progress(num_results / self.parameters.get('amount')) 172 173 def login(self, username, password): 174 """ 175 Login and authenticate user 176 """ 177 vk_session = vk_api.VkApi(username, 178 password, 179 config_filename=Path(self.config.get("PATH_ROOT")).joinpath(self.config.get("PATH_SESSIONS"), username+"-vk_config.json")) 180 vk_session.auth() 181 182 return vk_session 183 184 def search_newsfeed(self, vk_helper, query, max_amount, num_collected=0, start_time=None, end_time=None, **kwargs): 185 """ 186 Collects all newsfeed posts 187 188 :param Object vk_helper: Authorized vk_api.VkApi 189 :param str query: String representing the search query 190 :param int max_amount: Max number of posts to collect 191 :param int num_collected: Number of previously collected results 192 :param int start_time: Timestamp for earliest post 193 :param int end_time: Timestamp for latest post 194 :return generator: Yields groups of posts 195 """ 196 remaining = max_amount - num_collected 197 parameters = { 198 "q": query, 199 "extended": 1, 200 "count": remaining if remaining < 200 else 200, 201 "fields": self.expanded_profile_fields, 202 } 203 if start_time: 204 parameters["start_time"] = start_time 205 if end_time: 206 parameters["end_time"] = end_time 207 208 response = vk_helper.newsfeed.search(**parameters) 209 news_feed_results = response.get("items", []) 210 num_collected = num_collected + len(news_feed_results) 211 212 # Flesh out profiles and groups 213 author_profiles = self.expand_profile_fields({"profiles": response.get("profiles", []), "groups": response.get("groups", [])}) 214 [result.update({"author_profile": author_profiles.get(result.get("from_id"), {})}) for result in news_feed_results] 215 216 yield news_feed_results 217 218 # Collect additional results 219 if response.get("next_from") and num_collected < max_amount: 220 parameters.update({"start_from": response.get("next_from")}) 221 for additional_results in self.search_newsfeed(vk_helper, query, max_amount, num_collected=num_collected, **parameters): 222 yield additional_results 223 224 def collect_all_comments(self, vk_helper, owner_id, post_id): 225 """ 226 Collects all comments and replies to a VK post 227 228 :param Object vk_helper: Authorized vk_api.VkApi 229 :param int owner_id: Owner ID provided by post/comment/etc 230 :param int post_id: ID of post from which to collect comments 231 :return generator: Yields comments and replies 232 """ 233 # Collect top level comments from post 234 comments = self.get_comments(vk_helper, owner_id, post_id=post_id) 235 236 # Extract replies and collect more if needed 237 for comment in comments: 238 yield comment 239 240 reply_count = comment.get("thread", {}).get("count", 0) 241 replies = comment.get("thread", {}).get("items", []) 242 if reply_count > 10 and len(replies) == 10: 243 # Collect additional replies 244 replies += self.get_comments(vk_helper, owner_id, comment_id=comment.get("id"), last_collected_id=replies[-1].get("id"))[1:] 245 246 for reply in replies: 247 yield reply 248 if reply.get("thread"): 249 self.log.warning("VK Datasource issue with replies: additional depth needs to be handled; contact 4CAT devs") 250 # TODO: this will need modification if reply threads gain depth 251 252 def get_comments(self, vk_helper, owner_id, post_id=None, comment_id=None, last_collected_id=None, **kwargs): 253 """ 254 Collect comments from either a post or another comment (i.e., replies to another comment). Must provide either 255 post_id or comment_id, but not both. 256 257 More information can be found here: 258 https://vk.com/dev/wall.getComments 259 260 :param Object vk_helper: Authorized vk_api.VkApi 261 :param int owner_id: Owner ID provided by post/comment/etc 262 :param int post_id: ID of post from which to collect comments 263 :param int comment_id: ID of comment from which to collect comments 264 :param int last_collected_id: ID of the last comment to collected; used as start to continue collecting comments 265 :return list: List of comments 266 """ 267 if self.interrupted: 268 raise ProcessorInterruptedException("Interrupted while fetching comments from the VK API") 269 270 if post_id is None and comment_id is None: 271 raise ProcessorException("Must provide either post_id or comment_id to collect comments from VK") 272 273 parameters = { 274 "owner_id": owner_id, 275 "need_likes": 1, 276 "preview_length": 0, 277 "extended": 1, 278 "count": 100, 279 "thread_items_count": 10, 280 "fields": self.expanded_profile_fields, 281 } 282 if post_id: 283 parameters.update({"post_id": post_id}) 284 if comment_id: 285 parameters.update({"comment_id": comment_id}) 286 if last_collected_id: 287 parameters.update({"start_comment_id": last_collected_id}) 288 289 # Collect comments from VK 290 try: 291 response = vk_helper.wall.getComments(**parameters) 292 except vk_api.exceptions.ApiError as e: 293 self.dataset.log(f"Unable to collect comments for owner_id {owner_id} and {'post_id' if post_id is not None else 'comment_id'} {post_id if post_id is not None else comment_id}: {e}") 294 return [] 295 comments = response.get("items", []) 296 297 # Flesh out profiles and groups 298 author_profiles = self.expand_profile_fields({"profiles": response.get("profiles", []), "groups": response.get("groups", [])}) 299 [comment.update({"author_profile": author_profiles.get(comment.get("from_id"), {})}) for comment in comments] 300 # Also expand replies 301 [reply.update({"author_profile": author_profiles.get(reply.get("from_id"), {})}) for replies in [comment.get("thread", {}).get("items", []) for comment in comments if comment.get("thread")] for reply in replies] 302 303 # Check if there are potentially additional comments 304 if response.get("count") > 100 and len(comments) == 100: 305 # Update params with last collected comment 306 parameters.update({"start_comment_id": comments[-1].get("id")}) 307 # Collect additional comments from VK and remove first comment (which is duplicate) 308 comments += self.get_comments(vk_helper=vk_helper, **parameters)[1:] 309 310 return comments 311 312 @ staticmethod 313 def expand_profile_fields(dict_of_profile_types): 314 """ 315 Combine various VK profile and group author information for easy lookup. Add 4CAT_author_profile_type field to 316 differentiate source of data later. 317 """ 318 author_types = {} 319 for profile_type, profiles in dict_of_profile_types.items(): 320 for profile in profiles: 321 if "id" not in profile: 322 raise ProcessorException("Profile missing id field; VK data format incorrect/changed") 323 elif profile.get("id") in author_types: 324 raise ProcessorException("Profile id duplicated across profile types; unable to combine profiles") 325 profile.update({"4CAT_author_profile_type": profile_type}) 326 author_types[profile.get("id")] = profile 327 return author_types 328 329 @staticmethod 330 def validate_query(query, request, config): 331 """ 332 Validate input for a dataset query on the VK data source. 333 334 Will raise a QueryParametersException if invalid parameters are 335 encountered. Parameters are additionally sanitised. 336 337 :param dict query: Query parameters, from client-side. 338 :param request: Flask request 339 :param ConfigManager|None config: Configuration reader (context-aware) 340 :return dict: Safe query parameters 341 """ 342 # Please provide something... 343 if not query.get("query", None): 344 raise QueryParametersException("Please provide a query.") 345 346 # the dates need to make sense as a range to search within 347 # but, on VK, you can also specify before *or* after only 348 after, before = query.get("daterange") 349 if before and after and before < after: 350 raise QueryParametersException("Date range must start before it ends") 351 352 # TODO: test username and password? 353 354 # if we made it this far, the query can be executed 355 params = { 356 "query": query.get("query"), 357 "query_type": query.get("query_type"), 358 "amount": query.get("amount"), 359 "include_comments": query.get("include_comments"), 360 "min_date": after, 361 "max_date": before, 362 "username": query.get("username"), 363 "password": query.get("password"), 364 } 365 366 return params 367 368 @staticmethod 369 def map_item(item): 370 """ 371 Map a nested VK object to a flat dictionary 372 373 :param item: VK object as originally returned by the VK API 374 :return dict: Dictionary in the format expected by 4CAT 375 """ 376 vk_item_time = datetime.datetime.fromtimestamp(item.get('date')) 377 378 # Process attachments 379 photos = [] 380 videos = [] 381 audio = [] 382 links = [] 383 docs = [] 384 for attachment in item.get("attachments", []): 385 attachment_type = attachment.get("type") 386 attachment = attachment.get(attachment_type) 387 if attachment_type == "photo": 388 if attachment.get("sizes"): 389 photos.append(sorted(attachment.get("sizes"), key=lambda d: d['width'], reverse=True)[0].get('url')) 390 else: 391 photos.append(str(attachment)) 392 elif attachment_type == "video": 393 # TODO: can I get the actual URL? Does not seem like it... 394 videos.append(f"https://vk.com/video{attachment.get('owner_id')}_{attachment.get('id')}") 395 elif attachment_type == "audio": 396 # TODO: Seem unable to create the URL with provided information... 397 audio.append(f"{attachment.get('artist')} - {attachment.get('title')}") 398 elif attachment_type == "link": 399 links.append(attachment.get('url', str(attachment))) 400 elif attachment_type == "doc": 401 docs.append(attachment.get('url', str(attachment))) 402 403 # Use 4cat_item_type to populate different fields 404 tread_id = "" 405 in_reply_to_user = "" 406 in_reply_to_comment_id = "" 407 if item.get("4cat_item_type") == "post": 408 tread_id = item.get("id") 409 elif item.get("4cat_item_type") == "comment": 410 tread_id = item.get("post_id") 411 in_reply_to_user = item.get("reply_to_user") 412 in_reply_to_comment_id = item.get("reply_to_comment") 413 414 author_profile = item.get("author_profile", {}) 415 profile_source = "user" if author_profile.get("4CAT_author_profile_type") == "profile" else "community" if author_profile.get("4CAT_author_profile_type") == "group" else "N/A" 416 # Use source of author profile if "type" not present (e.g., in users profiles do not seem to have type) 417 author_type = author_profile.get("type", profile_source) 418 419 return MappedItem({ 420 "id": item.get("id"), 421 "thread_id": tread_id, 422 "timestamp": vk_item_time.strftime("%Y-%m-%d %H:%M:%S"), 423 "unix_timestamp": int(vk_item_time.timestamp()), 424 "link": f"https://vk.com/wall{item.get('owner_id')}_{item.get('id')}", 425 "item_type": item.get("4cat_item_type"), 426 "body": item.get("text"), 427 "author_id": item.get("from_id"), 428 "author_type": author_type, 429 "author_screen_name": author_profile.get("screen_name"), 430 "author_name": author_profile.get("name", " ".join([author_profile.get("first_name", ""), author_profile.get("last_name", "")])), 431 "author_sex": "F" if author_profile.get("sex") == 1 else "M" if author_profile.get("sex") == 2 else "Not Specified" if author_profile.get("sex") == 0 else author_profile.get("sex", "N/A"), 432 "author_city": author_profile.get("city", {}).get("title", ""), 433 "author_country": author_profile.get("country", {}).get("title", ""), 434 "author_photo": author_profile.get("photo_200", 435 author_profile.get("photo_100", author_profile.get("photo_50", ""))), 436 "author_is_admin": True if author_profile.get("is_admin") == 1 else False if author_profile.get("is_admin") == 0 else author_profile.get("is_admin", "N/A"), 437 "author_is_advertiser": True if author_profile.get("is_advertiser") == 1 else False if author_profile.get( 438 "is_advertiser") == 0 else author_profile.get("is_advertiser", "N/A"), 439 "author_deactivated": author_profile.get("is_deactivated", False), 440 "author_privacy_is_closed": 'closed' if author_profile.get("is_closed") == 1 else 'open' if author_profile.get("is_closed") == 0 else 'private' if author_profile.get("is_closed") == 2 else author_profile.get("is_closed", "N/A"), 441 "author_followers": author_profile.get("followers_count", author_profile.get("members_count", "N/A")), 442 "in_reply_to_user": in_reply_to_user, 443 "in_reply_to_comment_id": in_reply_to_comment_id, 444 "source": item.get("post_source", {}).get("type"), 445 "views": item.get("views", {}).get("count"), 446 "likes": item.get("likes", {}).get("count"), 447 "post_comments": item.get("comments", {}).get("count"), 448 "edited": datetime.datetime.fromtimestamp(item.get("edited")).strftime("%Y-%m-%d %H:%M:%S") if item.get("edited", False) else False, 449 "photos": ", ".join(photos), 450 "videos": ", ".join(videos), 451 "audio": ", ".join(audio), 452 "links": ", ".join(links), 453 "docs": ", ".join(docs), 454 "subject": "", 455 })
16class SearchVK(Search): 17 """ 18 Get posts via the VK API 19 """ 20 type = "vk-search" # job ID 21 title = "VK" 22 extension = "ndjson" 23 is_local = False # Whether this datasource is locally scraped 24 is_static = False # Whether this datasource is still updated 25 26 previous_request = 0 27 import_issues = True 28 29 references = [ 30 "[VK API documentation](https://vk.com/dev/first_guide)", 31 "[Python API wrapper](https://github.com/python273/vk_api)" 32 ] 33 34 expanded_profile_fields = "id,screen_name,first_name,last_name,name,deactivated,is_closed,is_admin,sex,city,country,photo_200,photo_100,photo_50,followers_count,members_count" # https://vk.com/dev/objects/user & https://vk.com/dev/objects/group 35 36 @classmethod 37 def get_options(cls, parent_dataset=None, config=None): 38 """ 39 Get VK data source options 40 41 :param config: 42 :param parent_dataset: Should always be None 43 :return dict: Data source options 44 """ 45 46 intro_text = ("This data source uses VK's [API](https://vk.com/dev/first_guide) and a python " 47 "[wrapper](https://github.com/python273/vk_api) to request information from VK using your " 48 "username and password.") 49 50 options = { 51 "intro-1": { 52 "type": UserInput.OPTION_INFO, 53 "help": intro_text 54 }, 55 "query_type": { 56 "type": UserInput.OPTION_CHOICE, 57 "help": "Query Type", 58 "options": { 59 "newsfeed": "News Feed search", 60 }, 61 "default": "newsfeed" 62 }, 63 "intro-2": { 64 "type": UserInput.OPTION_INFO, 65 "help": "Your username and password will be deleted after your query is complete." 66 }, 67 "username": { 68 "type": UserInput.OPTION_TEXT, 69 "sensitive": True, 70 "cache": True, 71 "help": "VK Username" 72 }, 73 "password": { 74 "type": UserInput.OPTION_TEXT, 75 "sensitive": True, 76 "cache": True, 77 "help": "VK Password" 78 }, 79 "intro-3": { 80 "type": UserInput.OPTION_INFO, 81 "help": "Enter the text to search for below." 82 }, 83 "query": { 84 "type": UserInput.OPTION_TEXT_LARGE, 85 "help": "Query" 86 }, 87 "amount": { 88 "type": UserInput.OPTION_TEXT, 89 "help": "Max items to retrieve", 90 "min": 0, 91 "max": 1000, 92 "default": 100 93 }, 94 "include_comments": { 95 "type": UserInput.OPTION_TOGGLE, 96 "help": "Include post comments", 97 "default": False, 98 "tooltip": "" 99 }, 100 "divider-2": { 101 "type": UserInput.OPTION_DIVIDER 102 }, 103 "daterange-info": { 104 "type": UserInput.OPTION_INFO, 105 "help": "VK daterange defaults vary by type of query. For the News Feed, posts are returned starting " 106 "with the most recent and working backwards." 107 }, 108 "daterange": { 109 "type": UserInput.OPTION_DATERANGE, 110 "help": "Date range" 111 }, 112 } 113 114 return options 115 116 def get_items(self, query): 117 """ 118 Use the VK API 119 120 :param query: 121 :return: 122 """ 123 if self.parameters.get("username") is None or self.parameters.get("password") is None: 124 self.dataset.update_status( 125 "VK query failed or was interrupted; please create new query in order to provide username and password again.", 126 is_final=True) 127 return [] 128 129 self.dataset.update_status("Logging in to VK") 130 try: 131 vk_session = self.login(self.parameters.get("username"), self.parameters.get("password")) 132 except vk_api.exceptions.AuthError as e: 133 self.log.warning(f"VK Auth Issues: {e}") 134 self.dataset.update_status(f"VK unable to authorize user: {e}", is_final=True) 135 return [] 136 137 query_type = self.parameters.get("query_type") 138 query = self.parameters.get("query") 139 include_comments = self.parameters.get("include_comments", False) 140 141 if query_type == "newsfeed": 142 query_parameters = {"query": query, 143 "max_amount": self.parameters.get("amount")} 144 145 # Add start and end dates if provided 146 if self.parameters.get("min_date"): 147 query_parameters['start_time'] = self.parameters.get("min_date") 148 if self.parameters.get("max_date"): 149 query_parameters['end_time'] = self.parameters.get("max_date") 150 151 vk_helper = vk_session.get_api() 152 153 # Collect Newsfeed results 154 num_results = 0 155 self.dataset.update_status("Submitting query...") 156 for i, result_batch in enumerate(self.search_newsfeed(vk_helper, **query_parameters)): 157 if self.interrupted: 158 raise ProcessorInterruptedException("Interrupted while fetching newsfeed data from the VK API") 159 160 self.dataset.update_status(f"Processing results batch {i+1}") 161 for result in result_batch: 162 result.update({'4cat_item_type': 'post'}) 163 yield result 164 num_results += 1 165 166 if include_comments: 167 for comment in self.collect_all_comments(vk_helper, owner_id=result.get("owner_id"), post_id=result.get("id")): 168 comment.update({'4cat_item_type': 'comment'}) 169 yield comment 170 171 self.dataset.update_status(f"Received {num_results} results of max {self.parameters.get('amount')} from the VK API") 172 self.dataset.update_progress(num_results / self.parameters.get('amount')) 173 174 def login(self, username, password): 175 """ 176 Login and authenticate user 177 """ 178 vk_session = vk_api.VkApi(username, 179 password, 180 config_filename=Path(self.config.get("PATH_ROOT")).joinpath(self.config.get("PATH_SESSIONS"), username+"-vk_config.json")) 181 vk_session.auth() 182 183 return vk_session 184 185 def search_newsfeed(self, vk_helper, query, max_amount, num_collected=0, start_time=None, end_time=None, **kwargs): 186 """ 187 Collects all newsfeed posts 188 189 :param Object vk_helper: Authorized vk_api.VkApi 190 :param str query: String representing the search query 191 :param int max_amount: Max number of posts to collect 192 :param int num_collected: Number of previously collected results 193 :param int start_time: Timestamp for earliest post 194 :param int end_time: Timestamp for latest post 195 :return generator: Yields groups of posts 196 """ 197 remaining = max_amount - num_collected 198 parameters = { 199 "q": query, 200 "extended": 1, 201 "count": remaining if remaining < 200 else 200, 202 "fields": self.expanded_profile_fields, 203 } 204 if start_time: 205 parameters["start_time"] = start_time 206 if end_time: 207 parameters["end_time"] = end_time 208 209 response = vk_helper.newsfeed.search(**parameters) 210 news_feed_results = response.get("items", []) 211 num_collected = num_collected + len(news_feed_results) 212 213 # Flesh out profiles and groups 214 author_profiles = self.expand_profile_fields({"profiles": response.get("profiles", []), "groups": response.get("groups", [])}) 215 [result.update({"author_profile": author_profiles.get(result.get("from_id"), {})}) for result in news_feed_results] 216 217 yield news_feed_results 218 219 # Collect additional results 220 if response.get("next_from") and num_collected < max_amount: 221 parameters.update({"start_from": response.get("next_from")}) 222 for additional_results in self.search_newsfeed(vk_helper, query, max_amount, num_collected=num_collected, **parameters): 223 yield additional_results 224 225 def collect_all_comments(self, vk_helper, owner_id, post_id): 226 """ 227 Collects all comments and replies to a VK post 228 229 :param Object vk_helper: Authorized vk_api.VkApi 230 :param int owner_id: Owner ID provided by post/comment/etc 231 :param int post_id: ID of post from which to collect comments 232 :return generator: Yields comments and replies 233 """ 234 # Collect top level comments from post 235 comments = self.get_comments(vk_helper, owner_id, post_id=post_id) 236 237 # Extract replies and collect more if needed 238 for comment in comments: 239 yield comment 240 241 reply_count = comment.get("thread", {}).get("count", 0) 242 replies = comment.get("thread", {}).get("items", []) 243 if reply_count > 10 and len(replies) == 10: 244 # Collect additional replies 245 replies += self.get_comments(vk_helper, owner_id, comment_id=comment.get("id"), last_collected_id=replies[-1].get("id"))[1:] 246 247 for reply in replies: 248 yield reply 249 if reply.get("thread"): 250 self.log.warning("VK Datasource issue with replies: additional depth needs to be handled; contact 4CAT devs") 251 # TODO: this will need modification if reply threads gain depth 252 253 def get_comments(self, vk_helper, owner_id, post_id=None, comment_id=None, last_collected_id=None, **kwargs): 254 """ 255 Collect comments from either a post or another comment (i.e., replies to another comment). Must provide either 256 post_id or comment_id, but not both. 257 258 More information can be found here: 259 https://vk.com/dev/wall.getComments 260 261 :param Object vk_helper: Authorized vk_api.VkApi 262 :param int owner_id: Owner ID provided by post/comment/etc 263 :param int post_id: ID of post from which to collect comments 264 :param int comment_id: ID of comment from which to collect comments 265 :param int last_collected_id: ID of the last comment to collected; used as start to continue collecting comments 266 :return list: List of comments 267 """ 268 if self.interrupted: 269 raise ProcessorInterruptedException("Interrupted while fetching comments from the VK API") 270 271 if post_id is None and comment_id is None: 272 raise ProcessorException("Must provide either post_id or comment_id to collect comments from VK") 273 274 parameters = { 275 "owner_id": owner_id, 276 "need_likes": 1, 277 "preview_length": 0, 278 "extended": 1, 279 "count": 100, 280 "thread_items_count": 10, 281 "fields": self.expanded_profile_fields, 282 } 283 if post_id: 284 parameters.update({"post_id": post_id}) 285 if comment_id: 286 parameters.update({"comment_id": comment_id}) 287 if last_collected_id: 288 parameters.update({"start_comment_id": last_collected_id}) 289 290 # Collect comments from VK 291 try: 292 response = vk_helper.wall.getComments(**parameters) 293 except vk_api.exceptions.ApiError as e: 294 self.dataset.log(f"Unable to collect comments for owner_id {owner_id} and {'post_id' if post_id is not None else 'comment_id'} {post_id if post_id is not None else comment_id}: {e}") 295 return [] 296 comments = response.get("items", []) 297 298 # Flesh out profiles and groups 299 author_profiles = self.expand_profile_fields({"profiles": response.get("profiles", []), "groups": response.get("groups", [])}) 300 [comment.update({"author_profile": author_profiles.get(comment.get("from_id"), {})}) for comment in comments] 301 # Also expand replies 302 [reply.update({"author_profile": author_profiles.get(reply.get("from_id"), {})}) for replies in [comment.get("thread", {}).get("items", []) for comment in comments if comment.get("thread")] for reply in replies] 303 304 # Check if there are potentially additional comments 305 if response.get("count") > 100 and len(comments) == 100: 306 # Update params with last collected comment 307 parameters.update({"start_comment_id": comments[-1].get("id")}) 308 # Collect additional comments from VK and remove first comment (which is duplicate) 309 comments += self.get_comments(vk_helper=vk_helper, **parameters)[1:] 310 311 return comments 312 313 @ staticmethod 314 def expand_profile_fields(dict_of_profile_types): 315 """ 316 Combine various VK profile and group author information for easy lookup. Add 4CAT_author_profile_type field to 317 differentiate source of data later. 318 """ 319 author_types = {} 320 for profile_type, profiles in dict_of_profile_types.items(): 321 for profile in profiles: 322 if "id" not in profile: 323 raise ProcessorException("Profile missing id field; VK data format incorrect/changed") 324 elif profile.get("id") in author_types: 325 raise ProcessorException("Profile id duplicated across profile types; unable to combine profiles") 326 profile.update({"4CAT_author_profile_type": profile_type}) 327 author_types[profile.get("id")] = profile 328 return author_types 329 330 @staticmethod 331 def validate_query(query, request, config): 332 """ 333 Validate input for a dataset query on the VK data source. 334 335 Will raise a QueryParametersException if invalid parameters are 336 encountered. Parameters are additionally sanitised. 337 338 :param dict query: Query parameters, from client-side. 339 :param request: Flask request 340 :param ConfigManager|None config: Configuration reader (context-aware) 341 :return dict: Safe query parameters 342 """ 343 # Please provide something... 344 if not query.get("query", None): 345 raise QueryParametersException("Please provide a query.") 346 347 # the dates need to make sense as a range to search within 348 # but, on VK, you can also specify before *or* after only 349 after, before = query.get("daterange") 350 if before and after and before < after: 351 raise QueryParametersException("Date range must start before it ends") 352 353 # TODO: test username and password? 354 355 # if we made it this far, the query can be executed 356 params = { 357 "query": query.get("query"), 358 "query_type": query.get("query_type"), 359 "amount": query.get("amount"), 360 "include_comments": query.get("include_comments"), 361 "min_date": after, 362 "max_date": before, 363 "username": query.get("username"), 364 "password": query.get("password"), 365 } 366 367 return params 368 369 @staticmethod 370 def map_item(item): 371 """ 372 Map a nested VK object to a flat dictionary 373 374 :param item: VK object as originally returned by the VK API 375 :return dict: Dictionary in the format expected by 4CAT 376 """ 377 vk_item_time = datetime.datetime.fromtimestamp(item.get('date')) 378 379 # Process attachments 380 photos = [] 381 videos = [] 382 audio = [] 383 links = [] 384 docs = [] 385 for attachment in item.get("attachments", []): 386 attachment_type = attachment.get("type") 387 attachment = attachment.get(attachment_type) 388 if attachment_type == "photo": 389 if attachment.get("sizes"): 390 photos.append(sorted(attachment.get("sizes"), key=lambda d: d['width'], reverse=True)[0].get('url')) 391 else: 392 photos.append(str(attachment)) 393 elif attachment_type == "video": 394 # TODO: can I get the actual URL? Does not seem like it... 395 videos.append(f"https://vk.com/video{attachment.get('owner_id')}_{attachment.get('id')}") 396 elif attachment_type == "audio": 397 # TODO: Seem unable to create the URL with provided information... 398 audio.append(f"{attachment.get('artist')} - {attachment.get('title')}") 399 elif attachment_type == "link": 400 links.append(attachment.get('url', str(attachment))) 401 elif attachment_type == "doc": 402 docs.append(attachment.get('url', str(attachment))) 403 404 # Use 4cat_item_type to populate different fields 405 tread_id = "" 406 in_reply_to_user = "" 407 in_reply_to_comment_id = "" 408 if item.get("4cat_item_type") == "post": 409 tread_id = item.get("id") 410 elif item.get("4cat_item_type") == "comment": 411 tread_id = item.get("post_id") 412 in_reply_to_user = item.get("reply_to_user") 413 in_reply_to_comment_id = item.get("reply_to_comment") 414 415 author_profile = item.get("author_profile", {}) 416 profile_source = "user" if author_profile.get("4CAT_author_profile_type") == "profile" else "community" if author_profile.get("4CAT_author_profile_type") == "group" else "N/A" 417 # Use source of author profile if "type" not present (e.g., in users profiles do not seem to have type) 418 author_type = author_profile.get("type", profile_source) 419 420 return MappedItem({ 421 "id": item.get("id"), 422 "thread_id": tread_id, 423 "timestamp": vk_item_time.strftime("%Y-%m-%d %H:%M:%S"), 424 "unix_timestamp": int(vk_item_time.timestamp()), 425 "link": f"https://vk.com/wall{item.get('owner_id')}_{item.get('id')}", 426 "item_type": item.get("4cat_item_type"), 427 "body": item.get("text"), 428 "author_id": item.get("from_id"), 429 "author_type": author_type, 430 "author_screen_name": author_profile.get("screen_name"), 431 "author_name": author_profile.get("name", " ".join([author_profile.get("first_name", ""), author_profile.get("last_name", "")])), 432 "author_sex": "F" if author_profile.get("sex") == 1 else "M" if author_profile.get("sex") == 2 else "Not Specified" if author_profile.get("sex") == 0 else author_profile.get("sex", "N/A"), 433 "author_city": author_profile.get("city", {}).get("title", ""), 434 "author_country": author_profile.get("country", {}).get("title", ""), 435 "author_photo": author_profile.get("photo_200", 436 author_profile.get("photo_100", author_profile.get("photo_50", ""))), 437 "author_is_admin": True if author_profile.get("is_admin") == 1 else False if author_profile.get("is_admin") == 0 else author_profile.get("is_admin", "N/A"), 438 "author_is_advertiser": True if author_profile.get("is_advertiser") == 1 else False if author_profile.get( 439 "is_advertiser") == 0 else author_profile.get("is_advertiser", "N/A"), 440 "author_deactivated": author_profile.get("is_deactivated", False), 441 "author_privacy_is_closed": 'closed' if author_profile.get("is_closed") == 1 else 'open' if author_profile.get("is_closed") == 0 else 'private' if author_profile.get("is_closed") == 2 else author_profile.get("is_closed", "N/A"), 442 "author_followers": author_profile.get("followers_count", author_profile.get("members_count", "N/A")), 443 "in_reply_to_user": in_reply_to_user, 444 "in_reply_to_comment_id": in_reply_to_comment_id, 445 "source": item.get("post_source", {}).get("type"), 446 "views": item.get("views", {}).get("count"), 447 "likes": item.get("likes", {}).get("count"), 448 "post_comments": item.get("comments", {}).get("count"), 449 "edited": datetime.datetime.fromtimestamp(item.get("edited")).strftime("%Y-%m-%d %H:%M:%S") if item.get("edited", False) else False, 450 "photos": ", ".join(photos), 451 "videos": ", ".join(videos), 452 "audio": ", ".join(audio), 453 "links": ", ".join(links), 454 "docs": ", ".join(docs), 455 "subject": "", 456 })
Get posts via the VK API
36 @classmethod 37 def get_options(cls, parent_dataset=None, config=None): 38 """ 39 Get VK data source options 40 41 :param config: 42 :param parent_dataset: Should always be None 43 :return dict: Data source options 44 """ 45 46 intro_text = ("This data source uses VK's [API](https://vk.com/dev/first_guide) and a python " 47 "[wrapper](https://github.com/python273/vk_api) to request information from VK using your " 48 "username and password.") 49 50 options = { 51 "intro-1": { 52 "type": UserInput.OPTION_INFO, 53 "help": intro_text 54 }, 55 "query_type": { 56 "type": UserInput.OPTION_CHOICE, 57 "help": "Query Type", 58 "options": { 59 "newsfeed": "News Feed search", 60 }, 61 "default": "newsfeed" 62 }, 63 "intro-2": { 64 "type": UserInput.OPTION_INFO, 65 "help": "Your username and password will be deleted after your query is complete." 66 }, 67 "username": { 68 "type": UserInput.OPTION_TEXT, 69 "sensitive": True, 70 "cache": True, 71 "help": "VK Username" 72 }, 73 "password": { 74 "type": UserInput.OPTION_TEXT, 75 "sensitive": True, 76 "cache": True, 77 "help": "VK Password" 78 }, 79 "intro-3": { 80 "type": UserInput.OPTION_INFO, 81 "help": "Enter the text to search for below." 82 }, 83 "query": { 84 "type": UserInput.OPTION_TEXT_LARGE, 85 "help": "Query" 86 }, 87 "amount": { 88 "type": UserInput.OPTION_TEXT, 89 "help": "Max items to retrieve", 90 "min": 0, 91 "max": 1000, 92 "default": 100 93 }, 94 "include_comments": { 95 "type": UserInput.OPTION_TOGGLE, 96 "help": "Include post comments", 97 "default": False, 98 "tooltip": "" 99 }, 100 "divider-2": { 101 "type": UserInput.OPTION_DIVIDER 102 }, 103 "daterange-info": { 104 "type": UserInput.OPTION_INFO, 105 "help": "VK daterange defaults vary by type of query. For the News Feed, posts are returned starting " 106 "with the most recent and working backwards." 107 }, 108 "daterange": { 109 "type": UserInput.OPTION_DATERANGE, 110 "help": "Date range" 111 }, 112 } 113 114 return options
Get VK data source options
Parameters
- config:
- parent_dataset: Should always be None
Returns
Data source options
116 def get_items(self, query): 117 """ 118 Use the VK API 119 120 :param query: 121 :return: 122 """ 123 if self.parameters.get("username") is None or self.parameters.get("password") is None: 124 self.dataset.update_status( 125 "VK query failed or was interrupted; please create new query in order to provide username and password again.", 126 is_final=True) 127 return [] 128 129 self.dataset.update_status("Logging in to VK") 130 try: 131 vk_session = self.login(self.parameters.get("username"), self.parameters.get("password")) 132 except vk_api.exceptions.AuthError as e: 133 self.log.warning(f"VK Auth Issues: {e}") 134 self.dataset.update_status(f"VK unable to authorize user: {e}", is_final=True) 135 return [] 136 137 query_type = self.parameters.get("query_type") 138 query = self.parameters.get("query") 139 include_comments = self.parameters.get("include_comments", False) 140 141 if query_type == "newsfeed": 142 query_parameters = {"query": query, 143 "max_amount": self.parameters.get("amount")} 144 145 # Add start and end dates if provided 146 if self.parameters.get("min_date"): 147 query_parameters['start_time'] = self.parameters.get("min_date") 148 if self.parameters.get("max_date"): 149 query_parameters['end_time'] = self.parameters.get("max_date") 150 151 vk_helper = vk_session.get_api() 152 153 # Collect Newsfeed results 154 num_results = 0 155 self.dataset.update_status("Submitting query...") 156 for i, result_batch in enumerate(self.search_newsfeed(vk_helper, **query_parameters)): 157 if self.interrupted: 158 raise ProcessorInterruptedException("Interrupted while fetching newsfeed data from the VK API") 159 160 self.dataset.update_status(f"Processing results batch {i+1}") 161 for result in result_batch: 162 result.update({'4cat_item_type': 'post'}) 163 yield result 164 num_results += 1 165 166 if include_comments: 167 for comment in self.collect_all_comments(vk_helper, owner_id=result.get("owner_id"), post_id=result.get("id")): 168 comment.update({'4cat_item_type': 'comment'}) 169 yield comment 170 171 self.dataset.update_status(f"Received {num_results} results of max {self.parameters.get('amount')} from the VK API") 172 self.dataset.update_progress(num_results / self.parameters.get('amount'))
Use the VK API
Parameters
- query:
Returns
174 def login(self, username, password): 175 """ 176 Login and authenticate user 177 """ 178 vk_session = vk_api.VkApi(username, 179 password, 180 config_filename=Path(self.config.get("PATH_ROOT")).joinpath(self.config.get("PATH_SESSIONS"), username+"-vk_config.json")) 181 vk_session.auth() 182 183 return vk_session
Login and authenticate user
185 def search_newsfeed(self, vk_helper, query, max_amount, num_collected=0, start_time=None, end_time=None, **kwargs): 186 """ 187 Collects all newsfeed posts 188 189 :param Object vk_helper: Authorized vk_api.VkApi 190 :param str query: String representing the search query 191 :param int max_amount: Max number of posts to collect 192 :param int num_collected: Number of previously collected results 193 :param int start_time: Timestamp for earliest post 194 :param int end_time: Timestamp for latest post 195 :return generator: Yields groups of posts 196 """ 197 remaining = max_amount - num_collected 198 parameters = { 199 "q": query, 200 "extended": 1, 201 "count": remaining if remaining < 200 else 200, 202 "fields": self.expanded_profile_fields, 203 } 204 if start_time: 205 parameters["start_time"] = start_time 206 if end_time: 207 parameters["end_time"] = end_time 208 209 response = vk_helper.newsfeed.search(**parameters) 210 news_feed_results = response.get("items", []) 211 num_collected = num_collected + len(news_feed_results) 212 213 # Flesh out profiles and groups 214 author_profiles = self.expand_profile_fields({"profiles": response.get("profiles", []), "groups": response.get("groups", [])}) 215 [result.update({"author_profile": author_profiles.get(result.get("from_id"), {})}) for result in news_feed_results] 216 217 yield news_feed_results 218 219 # Collect additional results 220 if response.get("next_from") and num_collected < max_amount: 221 parameters.update({"start_from": response.get("next_from")}) 222 for additional_results in self.search_newsfeed(vk_helper, query, max_amount, num_collected=num_collected, **parameters): 223 yield additional_results
Collects all newsfeed posts
Parameters
- Object vk_helper: Authorized vk_api.VkApi
- str query: String representing the search query
- int max_amount: Max number of posts to collect
- int num_collected: Number of previously collected results
- int start_time: Timestamp for earliest post
- int end_time: Timestamp for latest post
Returns
Yields groups of posts
225 def collect_all_comments(self, vk_helper, owner_id, post_id): 226 """ 227 Collects all comments and replies to a VK post 228 229 :param Object vk_helper: Authorized vk_api.VkApi 230 :param int owner_id: Owner ID provided by post/comment/etc 231 :param int post_id: ID of post from which to collect comments 232 :return generator: Yields comments and replies 233 """ 234 # Collect top level comments from post 235 comments = self.get_comments(vk_helper, owner_id, post_id=post_id) 236 237 # Extract replies and collect more if needed 238 for comment in comments: 239 yield comment 240 241 reply_count = comment.get("thread", {}).get("count", 0) 242 replies = comment.get("thread", {}).get("items", []) 243 if reply_count > 10 and len(replies) == 10: 244 # Collect additional replies 245 replies += self.get_comments(vk_helper, owner_id, comment_id=comment.get("id"), last_collected_id=replies[-1].get("id"))[1:] 246 247 for reply in replies: 248 yield reply 249 if reply.get("thread"): 250 self.log.warning("VK Datasource issue with replies: additional depth needs to be handled; contact 4CAT devs") 251 # TODO: this will need modification if reply threads gain depth
Collects all comments and replies to a VK post
Parameters
- Object vk_helper: Authorized vk_api.VkApi
- int owner_id: Owner ID provided by post/comment/etc
- int post_id: ID of post from which to collect comments
Returns
Yields comments and replies
253 def get_comments(self, vk_helper, owner_id, post_id=None, comment_id=None, last_collected_id=None, **kwargs): 254 """ 255 Collect comments from either a post or another comment (i.e., replies to another comment). Must provide either 256 post_id or comment_id, but not both. 257 258 More information can be found here: 259 https://vk.com/dev/wall.getComments 260 261 :param Object vk_helper: Authorized vk_api.VkApi 262 :param int owner_id: Owner ID provided by post/comment/etc 263 :param int post_id: ID of post from which to collect comments 264 :param int comment_id: ID of comment from which to collect comments 265 :param int last_collected_id: ID of the last comment to collected; used as start to continue collecting comments 266 :return list: List of comments 267 """ 268 if self.interrupted: 269 raise ProcessorInterruptedException("Interrupted while fetching comments from the VK API") 270 271 if post_id is None and comment_id is None: 272 raise ProcessorException("Must provide either post_id or comment_id to collect comments from VK") 273 274 parameters = { 275 "owner_id": owner_id, 276 "need_likes": 1, 277 "preview_length": 0, 278 "extended": 1, 279 "count": 100, 280 "thread_items_count": 10, 281 "fields": self.expanded_profile_fields, 282 } 283 if post_id: 284 parameters.update({"post_id": post_id}) 285 if comment_id: 286 parameters.update({"comment_id": comment_id}) 287 if last_collected_id: 288 parameters.update({"start_comment_id": last_collected_id}) 289 290 # Collect comments from VK 291 try: 292 response = vk_helper.wall.getComments(**parameters) 293 except vk_api.exceptions.ApiError as e: 294 self.dataset.log(f"Unable to collect comments for owner_id {owner_id} and {'post_id' if post_id is not None else 'comment_id'} {post_id if post_id is not None else comment_id}: {e}") 295 return [] 296 comments = response.get("items", []) 297 298 # Flesh out profiles and groups 299 author_profiles = self.expand_profile_fields({"profiles": response.get("profiles", []), "groups": response.get("groups", [])}) 300 [comment.update({"author_profile": author_profiles.get(comment.get("from_id"), {})}) for comment in comments] 301 # Also expand replies 302 [reply.update({"author_profile": author_profiles.get(reply.get("from_id"), {})}) for replies in [comment.get("thread", {}).get("items", []) for comment in comments if comment.get("thread")] for reply in replies] 303 304 # Check if there are potentially additional comments 305 if response.get("count") > 100 and len(comments) == 100: 306 # Update params with last collected comment 307 parameters.update({"start_comment_id": comments[-1].get("id")}) 308 # Collect additional comments from VK and remove first comment (which is duplicate) 309 comments += self.get_comments(vk_helper=vk_helper, **parameters)[1:] 310 311 return comments
Collect comments from either a post or another comment (i.e., replies to another comment). Must provide either post_id or comment_id, but not both.
More information can be found here: https://vk.com/dev/wall.getComments
Parameters
- Object vk_helper: Authorized vk_api.VkApi
- int owner_id: Owner ID provided by post/comment/etc
- int post_id: ID of post from which to collect comments
- int comment_id: ID of comment from which to collect comments
- int last_collected_id: ID of the last comment to collected; used as start to continue collecting comments
Returns
List of comments
313 @ staticmethod 314 def expand_profile_fields(dict_of_profile_types): 315 """ 316 Combine various VK profile and group author information for easy lookup. Add 4CAT_author_profile_type field to 317 differentiate source of data later. 318 """ 319 author_types = {} 320 for profile_type, profiles in dict_of_profile_types.items(): 321 for profile in profiles: 322 if "id" not in profile: 323 raise ProcessorException("Profile missing id field; VK data format incorrect/changed") 324 elif profile.get("id") in author_types: 325 raise ProcessorException("Profile id duplicated across profile types; unable to combine profiles") 326 profile.update({"4CAT_author_profile_type": profile_type}) 327 author_types[profile.get("id")] = profile 328 return author_types
Combine various VK profile and group author information for easy lookup. Add 4CAT_author_profile_type field to differentiate source of data later.
330 @staticmethod 331 def validate_query(query, request, config): 332 """ 333 Validate input for a dataset query on the VK data source. 334 335 Will raise a QueryParametersException if invalid parameters are 336 encountered. Parameters are additionally sanitised. 337 338 :param dict query: Query parameters, from client-side. 339 :param request: Flask request 340 :param ConfigManager|None config: Configuration reader (context-aware) 341 :return dict: Safe query parameters 342 """ 343 # Please provide something... 344 if not query.get("query", None): 345 raise QueryParametersException("Please provide a query.") 346 347 # the dates need to make sense as a range to search within 348 # but, on VK, you can also specify before *or* after only 349 after, before = query.get("daterange") 350 if before and after and before < after: 351 raise QueryParametersException("Date range must start before it ends") 352 353 # TODO: test username and password? 354 355 # if we made it this far, the query can be executed 356 params = { 357 "query": query.get("query"), 358 "query_type": query.get("query_type"), 359 "amount": query.get("amount"), 360 "include_comments": query.get("include_comments"), 361 "min_date": after, 362 "max_date": before, 363 "username": query.get("username"), 364 "password": query.get("password"), 365 } 366 367 return params
Validate input for a dataset query on the VK data source.
Will raise a QueryParametersException if invalid parameters are encountered. Parameters are additionally sanitised.
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- ConfigManager|None config: Configuration reader (context-aware)
Returns
Safe query parameters
369 @staticmethod 370 def map_item(item): 371 """ 372 Map a nested VK object to a flat dictionary 373 374 :param item: VK object as originally returned by the VK API 375 :return dict: Dictionary in the format expected by 4CAT 376 """ 377 vk_item_time = datetime.datetime.fromtimestamp(item.get('date')) 378 379 # Process attachments 380 photos = [] 381 videos = [] 382 audio = [] 383 links = [] 384 docs = [] 385 for attachment in item.get("attachments", []): 386 attachment_type = attachment.get("type") 387 attachment = attachment.get(attachment_type) 388 if attachment_type == "photo": 389 if attachment.get("sizes"): 390 photos.append(sorted(attachment.get("sizes"), key=lambda d: d['width'], reverse=True)[0].get('url')) 391 else: 392 photos.append(str(attachment)) 393 elif attachment_type == "video": 394 # TODO: can I get the actual URL? Does not seem like it... 395 videos.append(f"https://vk.com/video{attachment.get('owner_id')}_{attachment.get('id')}") 396 elif attachment_type == "audio": 397 # TODO: Seem unable to create the URL with provided information... 398 audio.append(f"{attachment.get('artist')} - {attachment.get('title')}") 399 elif attachment_type == "link": 400 links.append(attachment.get('url', str(attachment))) 401 elif attachment_type == "doc": 402 docs.append(attachment.get('url', str(attachment))) 403 404 # Use 4cat_item_type to populate different fields 405 tread_id = "" 406 in_reply_to_user = "" 407 in_reply_to_comment_id = "" 408 if item.get("4cat_item_type") == "post": 409 tread_id = item.get("id") 410 elif item.get("4cat_item_type") == "comment": 411 tread_id = item.get("post_id") 412 in_reply_to_user = item.get("reply_to_user") 413 in_reply_to_comment_id = item.get("reply_to_comment") 414 415 author_profile = item.get("author_profile", {}) 416 profile_source = "user" if author_profile.get("4CAT_author_profile_type") == "profile" else "community" if author_profile.get("4CAT_author_profile_type") == "group" else "N/A" 417 # Use source of author profile if "type" not present (e.g., in users profiles do not seem to have type) 418 author_type = author_profile.get("type", profile_source) 419 420 return MappedItem({ 421 "id": item.get("id"), 422 "thread_id": tread_id, 423 "timestamp": vk_item_time.strftime("%Y-%m-%d %H:%M:%S"), 424 "unix_timestamp": int(vk_item_time.timestamp()), 425 "link": f"https://vk.com/wall{item.get('owner_id')}_{item.get('id')}", 426 "item_type": item.get("4cat_item_type"), 427 "body": item.get("text"), 428 "author_id": item.get("from_id"), 429 "author_type": author_type, 430 "author_screen_name": author_profile.get("screen_name"), 431 "author_name": author_profile.get("name", " ".join([author_profile.get("first_name", ""), author_profile.get("last_name", "")])), 432 "author_sex": "F" if author_profile.get("sex") == 1 else "M" if author_profile.get("sex") == 2 else "Not Specified" if author_profile.get("sex") == 0 else author_profile.get("sex", "N/A"), 433 "author_city": author_profile.get("city", {}).get("title", ""), 434 "author_country": author_profile.get("country", {}).get("title", ""), 435 "author_photo": author_profile.get("photo_200", 436 author_profile.get("photo_100", author_profile.get("photo_50", ""))), 437 "author_is_admin": True if author_profile.get("is_admin") == 1 else False if author_profile.get("is_admin") == 0 else author_profile.get("is_admin", "N/A"), 438 "author_is_advertiser": True if author_profile.get("is_advertiser") == 1 else False if author_profile.get( 439 "is_advertiser") == 0 else author_profile.get("is_advertiser", "N/A"), 440 "author_deactivated": author_profile.get("is_deactivated", False), 441 "author_privacy_is_closed": 'closed' if author_profile.get("is_closed") == 1 else 'open' if author_profile.get("is_closed") == 0 else 'private' if author_profile.get("is_closed") == 2 else author_profile.get("is_closed", "N/A"), 442 "author_followers": author_profile.get("followers_count", author_profile.get("members_count", "N/A")), 443 "in_reply_to_user": in_reply_to_user, 444 "in_reply_to_comment_id": in_reply_to_comment_id, 445 "source": item.get("post_source", {}).get("type"), 446 "views": item.get("views", {}).get("count"), 447 "likes": item.get("likes", {}).get("count"), 448 "post_comments": item.get("comments", {}).get("count"), 449 "edited": datetime.datetime.fromtimestamp(item.get("edited")).strftime("%Y-%m-%d %H:%M:%S") if item.get("edited", False) else False, 450 "photos": ", ".join(photos), 451 "videos": ", ".join(videos), 452 "audio": ", ".join(audio), 453 "links": ", ".join(links), 454 "docs": ", ".join(docs), 455 "subject": "", 456 })
Map a nested VK object to a flat dictionary
Parameters
- item: VK object as originally returned by the VK API
Returns
Dictionary in the format expected by 4CAT
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.search.Search
- max_workers
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- search
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- description
- category
- config
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- iterate_proxied_requests
- push_proxied_request
- flush_proxied_requests
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- save_annotations
- map_item_method_available
- get_mapped_item
- is_filter
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor