datasources.bsky.search_bsky
Collect Bluesky posts
1""" 2Collect Bluesky posts 3""" 4import hashlib 5import time 6from datetime import datetime 7from pathlib import Path 8 9from dateutil import parser 10 11from atproto import Client, Session, SessionEvent 12from atproto_client.exceptions import UnauthorizedError, BadRequestError, InvokeTimeoutError, RequestException, \ 13 ModelError, NetworkError 14 15from backend.lib.search import Search 16from common.lib.exceptions import QueryParametersException, QueryNeedsExplicitConfirmationException, \ 17 ProcessorInterruptedException 18from common.lib.helpers import timify_long 19from common.lib.user_input import UserInput 20from common.config_manager import config 21from common.lib.item_mapping import MappedItem, MissingMappedField 22 23class SearchBluesky(Search): 24 """ 25 Search for posts in Bluesky 26 """ 27 type = "bsky-search" # job ID 28 category = "Search" # category 29 title = "Bluesky Search" # title displayed in UI 30 description = "Collects Bluesky posts via its API." # description displayed in UI 31 extension = "ndjson" # extension of result file, used internally and in UI 32 is_local = False # Whether this datasource is locally scraped 33 is_static = False # Whether this datasource is still updated 34 35 config = { 36 "bsky-search.max_results": { 37 "type": UserInput.OPTION_TEXT, 38 "help": "Maximum results per query", 39 "coerce_type": int, 40 "min": 0, 41 "default": 50000, 42 "tooltip": "Amount of results (e.g., posts) per query. '0' will allow unlimited." 43 } 44 } 45 46 handle_lookup_error_messages = ['account is deactivated', "profile not found", "account has been suspended"] 47 48 @classmethod 49 def get_options(cls, parent_dataset=None, user=None): 50 """ 51 Get processor options 52 53 Just updates the description of the entities field based on the 54 configured max entities. 55 56 :param DataSet parent_dataset: An object representing the dataset that 57 the processor would be run on 58 :param User user: Flask user the options will be displayed for, in 59 case they are requested for display in the 4CAT web interface. This can 60 be used to show some options only to privileges users. 61 """ 62 options = { 63 "intro": { 64 "type": UserInput.OPTION_INFO, 65 "help": "Collects Bluesky posts via its API.\n\nYour login credentials will be sent to the 4CAT server " 66 "and stored there while data is fetched. After the dataset has been created your credentials " 67 "will be deleted from the server. \n[See tips and tricks on how to query Bluesky](https://bsky.social/about/blog/05-31-2024-search)." 68 }, 69 "username": { 70 "type": UserInput.OPTION_TEXT, 71 "help": "Bluesky Username", 72 "cache": True, 73 "sensitive": True, 74 }, 75 "password": { 76 "type": UserInput.OPTION_TEXT, 77 "help": "Bluesky Password", 78 "cache": True, # tells the frontend to cache this value 79 "sensitive": True, # tells the backend to delete this value after use 80 "password": True, # tells the frontend this is a password type 81 }, 82 "divider": { 83 "type": UserInput.OPTION_DIVIDER 84 }, 85 "query": { 86 "type": UserInput.OPTION_TEXT_LARGE, 87 "help": "Search Queries", 88 "tooltip": "Separate with commas or line breaks." 89 }, 90 "max_posts": { 91 "type": UserInput.OPTION_TEXT, 92 "help": "Max posts per query", 93 "min": 1, 94 "default": 100 95 }, 96 "daterange": { 97 "type": UserInput.OPTION_DATERANGE, 98 "help": "Date range", 99 "tooltip": "The date range for the search. No date range will search all posts." 100 }, 101 } 102 103 # Update the max_posts setting from config 104 max_posts = int(config.get('bsky-search.max_results', 100, user=user)) 105 if max_posts == 0: 106 # This is potentially madness 107 options["max_posts"]["tooltip"] = "Set to 0 to collect all posts." 108 options['max_posts']['min'] = 0 109 else: 110 options["max_posts"]["max"] = max_posts 111 options['max_posts']['default'] = options['max_posts']['default'] if options['max_posts']['default'] <= max_posts else max_posts 112 113 return options 114 115 @staticmethod 116 def validate_query(query, request, user): 117 """ 118 Validate Bluesky query 119 120 :param dict query: Query parameters, from client-side. 121 :param request: Flask request 122 :param User user: User object of user who has submitted the query 123 :return dict: Safe query parameters 124 """ 125 # no query 4 u 126 if not query.get("query", "").strip(): 127 raise QueryParametersException("You must provide a search query.") 128 129 if not query.get("username", None) or not query.get("password", None) : 130 raise QueryParametersException("You need to provide valid Bluesky login credentials first.") 131 132 # Test login credentials 133 session_id = SearchBluesky.create_session_id(query["username"], query["password"]) 134 try: 135 SearchBluesky.bsky_login(username=query["username"], password=query["password"], session_id=session_id) 136 except UnauthorizedError as e: 137 raise QueryParametersException("Invalid Bluesky login credentials.") 138 except RequestException as e: 139 if e.response.content.message == 'Rate Limit Exceeded': 140 lifted_at = datetime.fromtimestamp(int(e.response.headers["ratelimit-reset"])) 141 raise QueryParametersException(f"Bluesky rate limit exceeded. Try again after {lifted_at.strftime('%Y-%m-%d %H:%M:%S')}.") 142 else: 143 raise QueryParametersException(f"Bluesky login failed. {e.response.content.message}") 144 145 # sanitize query 146 sanitized_query = [q.strip() for q in query.get("query").replace("\n", ",").split(",") if q.strip()] 147 148 # the dates need to make sense as a range to search within 149 min_date, max_date = query.get("daterange") 150 if min_date and max_date and min_date > max_date: 151 raise QueryParametersException("The start date must be before the end date.") 152 153 # Only check this if not already confirmed by the frontend 154 posts_per_second = 55 # gathered from simply checking start/end times of logs 155 if not query.get("frontend-confirm"): 156 # Estimate is not returned; use max_posts as a rough estimate 157 max_posts = query.get("max_posts", 100) 158 expected_tweets = query.get("max_posts", 100) * len(sanitized_query) 159 # Warn if process may take more than ~1 hours 160 if expected_tweets > (posts_per_second * 3600): 161 expected_time = timify_long(expected_tweets / posts_per_second) 162 raise QueryNeedsExplicitConfirmationException(f"This query matches approximately {expected_tweets} tweets and may take {expected_time} to complete. Do you want to continue?") 163 elif max_posts == 0 and not min_date: 164 raise QueryNeedsExplicitConfirmationException(f"No maximum number of posts set! This query has no minimum date and thus may take a very, very long time to complete. Do you want to continue?") 165 elif max_posts == 0: 166 raise QueryNeedsExplicitConfirmationException(f"No maximum number of posts set! This query may take a long time to complete. Do you want to continue?") 167 168 return { 169 "max_posts": query.get("max_posts"), 170 "query": ",".join(sanitized_query), 171 "username": query.get("username"), 172 "password": query.get("password"), 173 "session_id": session_id, 174 "min_date": min_date, 175 "max_date": max_date, 176 } 177 178 def get_items(self, query): 179 """ 180 Execute a query; get messages for given parameters 181 182 Basically a wrapper around execute_queries() to call it with asyncio. 183 184 :param dict query: Query parameters, as part of the DataSet object 185 :return list: Posts, sorted by thread and post ID, in ascending order 186 """ 187 if not query.get("session_id") and (not query.get("username") or not query.get("password")): 188 return self.dataset.finish_with_error("Your Bluesky login credentials are no longer available in 4CAT; please re-create this datasource.") 189 190 session_id = SearchBluesky.create_session_id(query.get("username"), query.get("password")) if not query.get("session_id") else query["session_id"] 191 try: 192 client = SearchBluesky.bsky_login(username=query.get("username"), password=query.get("password"), session_id=session_id) 193 except (UnauthorizedError, RequestException, BadRequestError) as e: 194 self.dataset.log(f"Bluesky login failed: {e}") 195 return self.dataset.finish_with_error("Bluesky login failed; please re-create this datasource.") 196 197 self.dataset.update_status(f"Collecting posts from Bluesky as {client.me.handle}") 198 199 max_posts = query.get("max_posts", 100) 200 limit = 100 if (max_posts > 100 or max_posts == 0) else max_posts 201 202 # Handle reference mapping; user references use did instead of dynamic handle 203 did_to_handle = {} 204 205 query_parameters = { 206 "limit": limit, 207 } 208 209 # Add start and end dates if provided 210 if self.parameters.get("min_date"): 211 query_parameters['since'] = datetime.fromtimestamp(self.parameters.get("min_date")).strftime('%Y-%m-%dT%H:%M:%SZ') 212 if self.parameters.get("max_date"): 213 query_parameters['until'] = datetime.fromtimestamp(self.parameters.get("max_date")).strftime('%Y-%m-%dT%H:%M:%SZ') 214 215 queries = query.get("query").split(",") 216 num_queries = len(queries) 217 total_posts = 0 218 i = 0 219 last_query = None 220 last_date = None 221 while queries: 222 query = queries.pop(0) 223 if query == last_query: 224 # Check if there are continued posts from the last query 225 query_parameters['until'] = last_date.strftime('%Y-%m-%dT%H:%M:%SZ') 226 self.dataset.log(f"Continuing query ({i} of {num_queries}): {query} from {last_date.strftime('%Y-%m-%dT%H:%M:%SZ')}") 227 else: 228 # New query 229 query_post_ids = set() 230 i += 1 231 rank = 0 232 last_query = query 233 last_date = None 234 self.dataset.update_status(f"Collecting query ({i} of {num_queries}): {query}") 235 query_requests = 0 236 237 query_parameters["q"] = query 238 cursor = None # Start with no cursor (first page) 239 search_for_invalid_post = False 240 invalid_post_counter = 0 241 while True: 242 if self.interrupted: 243 raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API") 244 # Query posts, including pagination (cursor for next page) 245 tries = 0 246 response = None 247 while tries < 3: 248 query_parameters["cursor"] = cursor 249 try: 250 response = client.app.bsky.feed.search_posts(params=query_parameters) 251 break 252 except ModelError as e: 253 # Post validation error; one post is unable to be read 254 # Pattern: some invalid post raises error, we switch from higher limit (usually 100) to 1 in 255 # order to collect post by post, invalid post is identified again, we switch back to higher 256 # limit and continue as normal, at the "end" of a cursor/query life (~10k posts) a NetworkError 257 # is raised with detail refering to a server error 502 InternalServerError, we catch that and 258 # add the query back to the queue with a new "until" date to continue the query 259 # https://github.com/bluesky-social/atproto/issues/3446 260 if not search_for_invalid_post: 261 # New invalid post, search and skip 262 self.dataset.log(f"Invalid post detected; searching post by post: {e}") 263 search_for_invalid_post = True 264 # Currently we must search post by post to find the invalid post 265 query_parameters["limit"] = 1 266 else: 267 # Found invalid post, skip, reset counters 268 self.dataset.log( 269 f"Invalid post identified; skipping and continue with query as normal: {e}") 270 search_for_invalid_post = False 271 # Reset limit to normal 272 query_parameters["limit"] = limit 273 invalid_post_counter = 0 274 cursor = str(int(cursor) + 1) if cursor else None 275 # Re-query with new cursor & limit 276 continue 277 278 except InvokeTimeoutError as e: 279 # Timeout error, but can occur for odd queries with no results 280 self.dataset.log(f"Bluesky request error for query {query}: {e}") 281 time.sleep(1) 282 tries += 2 283 continue 284 except NetworkError as e: 285 # 502 InternalServerError: occurs if switch limits in a "set" (i.e. the vague 10k posts cursor limit), I seem to get this error around the 10k mark instead of just a missing cursor as normal 286 self.dataset.log(f"Bluesky network error for query {query}; retrying: {e}") 287 time.sleep(1 + (tries * 10)) 288 queries.insert(0, query) 289 break 290 291 if not response: 292 # Expected from NetworkError, but the query will have been added back to the queue 293 # If not, then there was a problem with the query 294 if len(queries) == 0: 295 self.dataset.update_status(f"Error collecting posts from Bluesky; see log for details", is_final=True) 296 if query not in queries: 297 # Query was not added back; there was an unexpected issue with the query itself 298 self.dataset.update_status(f"Error continuing {query} from Bluesky (see log for details); continuing to next query") 299 break 300 301 query_requests += 1 302 items = response['posts'] if hasattr(response, 'posts') else [] 303 304 if search_for_invalid_post: 305 invalid_post_counter += 1 306 if invalid_post_counter >= 100: 307 # Max limit is 100; this should not occur, but we do not want to continue searching post by post indefinitely 308 self.dataset.log(f"Unable to identify invalid post; discontinuing search") 309 query_parameters["limit"] = limit 310 search_for_invalid_post = False 311 invalid_post_counter = 0 312 313 if not items: 314 # Sometimes no post is returned, but there still may be posts following 315 self.dataset.log(f"Query {query} w/ params {query_parameters} returned no posts: {response}") 316 # TODO: this is odd; no information is returned as to why that one item is not returned and no error is raised 317 cursor = str(int(cursor) + 1) if cursor else None 318 continue 319 320 new_posts = 0 321 # Handle the posts 322 for item in items: 323 if 0 < max_posts <= rank: 324 break 325 326 if self.interrupted: 327 raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API") 328 329 post = item.model_dump() 330 post_id = post["uri"] 331 # Queries use the indexed_at date for time-based pagination (as opposed to created_at); used to continue query if needed 332 last_date = SearchBluesky.bsky_convert_datetime_string(post.get("indexed_at")) 333 if post_id in query_post_ids: 334 # Skip duplicate posts 335 continue 336 337 new_posts += 1 338 query_post_ids.add(post_id) 339 340 # Add user handles from references 341 did_to_handle[post["author"]["did"]] = post["author"]["handle"] 342 # Mentions 343 mentions = [] 344 if post["record"].get("facets"): 345 for facet in post["record"]["facets"]: 346 for feature in facet.get("features", {}): 347 if feature.get("did"): 348 if feature["did"] in did_to_handle: 349 mentions.append({"did": feature["did"], "handle": did_to_handle[feature["did"]]}) 350 else: 351 handle = SearchBluesky.bsky_get_handle_from_did(client, feature["did"]) 352 if handle: 353 if handle.lower() in self.handle_lookup_error_messages: 354 self.dataset.log(f"Bluesky: user ({feature['did']}) {handle}") 355 mentions.append({"did": feature["did"], "handle": handle}) 356 did_to_handle[feature["did"]] = handle 357 else: 358 mentions.append({"did": feature["did"], "handle": None}) 359 self.dataset.log(f"Bluesky: could not lookup the handle for {feature['did']}") 360 # Reply to 361 reply_to_handle = None 362 if post["record"].get("reply"): 363 reply_to_did = post["record"]["reply"]["parent"]["uri"].split("/")[2] 364 if reply_to_did in did_to_handle: 365 reply_to_handle = did_to_handle[reply_to_did] 366 else: 367 handle = SearchBluesky.bsky_get_handle_from_did(client, reply_to_did) 368 if handle: 369 if handle.lower() in self.handle_lookup_error_messages: 370 self.dataset.log(f"Bluesky: user ({reply_to_did}) {handle}") 371 reply_to_handle = handle 372 did_to_handle[reply_to_did] = handle 373 else: 374 self.dataset.log(f"Bluesky: could not find handle for {reply_to_did}") 375 376 377 post.update({"4CAT_metadata": { 378 "collected_at": datetime.now().timestamp(), 379 "query": query, 380 "rank": rank, 381 "mentions": mentions, 382 "reply_to": reply_to_handle if reply_to_handle else None, 383 }}) 384 rank += 1 385 yield post 386 total_posts += 1 387 388 # Check if there is a cursor for the next page 389 cursor = response['cursor'] 390 if max_posts != 0: 391 self.dataset.update_progress(total_posts / (max_posts * num_queries)) 392 393 if 0 < max_posts <= rank: 394 self.dataset.update_status( 395 f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}") 396 break 397 398 if not cursor: 399 if new_posts: 400 # Bluesky API seems to stop around 10000 posts and not return a cursor 401 # Re-query with the same query to get the next set of posts using last_date (set above) 402 self.dataset.log(f"Query {query}: {query_requests} requests") 403 queries.insert(0, query) 404 else: 405 # No new posts; if we have not hit the max_posts, but no new posts are being returned, then we are done 406 self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned") 407 408 if rank: 409 self.dataset.update_status(f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}") 410 break # No more pages, stop the loop 411 elif not items: 412 self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned") 413 break 414 415 @staticmethod 416 def map_item(item): 417 """ 418 Convert item object to 4CAT-ready data object 419 420 :param dict item: item to parse 421 :return dict: 4CAT-compatible item object 422 """ 423 unmapped_data = [] 424 425 # Add link to post; this is a Bluesky-specific URL and may not always be accurate 426 link = SearchBluesky.get_bsky_link(item['author']['handle'], item['uri'].split('/')[-1]) 427 author_profile = f"https://bsky.app/profile/{item['author']['handle']}" 428 429 created_at = SearchBluesky.bsky_convert_datetime_string(item["record"].get("created_at",item["record"].get("createdAt"))) 430 431 # Tags 432 tags = set() 433 links = set() 434 mentions_did = set() 435 has_poll = False 436 if item["record"].get("facets"): 437 for facet in item["record"].get("facets"): 438 for feature in facet.get("features"): 439 if feature.get("tag"): 440 tags.add(feature.get("tag")) 441 elif feature.get("uri"): 442 links.add(feature.get("uri")) 443 elif feature.get("did"): 444 mentions_did.add(feature.get("did")) 445 elif feature.get("number"): 446 has_poll = True 447 else: 448 unmapped_data.append({"loc": "record.facets.features", "obj": feature}) 449 if "features" not in facet: 450 unmapped_data.append({"loc": "record.facets", "obj": facet}) 451 452 # Embeds are in both the item and the record; so far these always contain same item 453 embeded_links = set() 454 embeded_images = set() 455 image_references = set() 456 quoted_link = None 457 quoted_user = None 458 quoted_ref = None 459 possible_embeds = [item.get("embed", {}), item["record"].get("embed", {})] 460 while possible_embeds: 461 embed = possible_embeds.pop(0) 462 if not embed: 463 continue 464 465 py_type = embed.pop("py_type") if "py_type" in embed else (embed.pop("$type") if "$type" in embed else None) 466 if py_type in ["app.bsky.embed.recordWithMedia#view", "app.bsky.embed.recordWithMedia"]: 467 # contains post plus additional media 468 for key, media_ob in embed.items(): 469 possible_embeds.append(media_ob) 470 471 elif "images" in embed: # py_type in ["app.bsky.embed.images#view", "app.bsky.embed.images", "app.bsky.embed.images#main"] 472 for img_ob in embed["images"]: 473 img_link = img_ob.get("fullsize", img_ob.get("thumb")) 474 if img_link: 475 embeded_images.add(img_link) 476 elif img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")): 477 # ob.get("image").get("ref").get("link") will have a reference that could be linked via API 478 # BUT ref has already been obtained in other embeds... 479 image_references.add(img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link"))) 480 else: 481 unmapped_data.append({"loc": "embed.images", "obj": img_ob}) 482 elif py_type in ["app.bsky.embed.video#view", "app.bsky.embed.video"]: 483 # Does not appear to be direct video links, just thumbnail (Bluesky API may be able to get more) 484 if embed.get("thumbnail"): 485 embeded_images.add(embed.get("thumbnail")) 486 elif embed.get("video", {}).get("ref", {}).get("link"): 487 image_references.add(embed.get("video", {}).get("ref", {}).get("link")) 488 else: 489 # No thumb for video 490 pass 491 elif "record" in embed: # py_type in ["app.bsky.embed.record#view", "app.bsky.embed.record"] 492 # Quoted post 493 # Note: these may also contain images that would be seen, but are not part of the original post 494 if embed["record"].get("author", embed["record"].get("creator")): 495 if "handle" not in embed["record"].get("author", embed["record"].get("creator")): 496 # User may not be able to see original post 497 if "app.bsky.feed.defs#blockedAuthor" == embed["record"].get("author", embed["record"].get("creator"))["py_type"]: 498 quoted_link = "VIEWER BLOCKED BY AUTHOR" 499 else: 500 # New unknown 501 unmapped_data.append({"loc": "embed.record.author", "obj": embed["record"].get("author", embed["record"].get("creator"))}) 502 else: 503 # author seems to be a quoted post while creator a quoted "list" 504 quoted_user = embed["record"].get("author", embed["record"].get("creator"))["handle"] 505 quoted_link = SearchBluesky.get_bsky_link(quoted_user, embed['record']['uri'].split('/')[-1]) 506 elif embed["record"].get("not_found"): 507 quoted_link = "DELETED" 508 # We do have the DID, but this information is not normally displayed 509 # quoted_user = embed["record"]['uri'].split("/")[2] 510 elif embed["record"].get("detached"): 511 quoted_link = "REMOVED BY AUTHOR" 512 else: 513 quoted_ref = embed["record"]['uri'] 514 elif "external" in embed: # py_type in ["app.bsky.embed.external#view", "app.bsky.embed.external"] 515 if embed["external"].get("uri"): 516 embeded_links.add(embed["external"].get("uri")) 517 if embed["external"].get("thumb"): 518 if isinstance(embed["external"]["thumb"], str): 519 embeded_images.add(embed["external"]["thumb"]) 520 else: 521 image_references.add(embed["external"]["thumb"].get("ref", {}).get("link", "")) 522 else: 523 # No thumb for link 524 pass 525 else: 526 unmapped_data.append({"loc": f"embed.{py_type}", 527 "obj": embed}) 528 529 # Replies allowed 530 # https://docs.bsky.app/docs/tutorials/thread-gates 531 # threadgate object does not appear to differentiate between the types of replies allowed 532 replies_allowed = True if not item["threadgate"] else False 533 534 # Labels (content moderation) 535 labels = set() if not item["labels"] else set([label.get("val") for label in item["labels"]]) 536 if item["record"].get("labels"): 537 labels = labels | set([label.get("val") for label in item["record"]["labels"].get("values",[])]) 538 539 # Language 540 languages = "N/A" if not item["record"].get("langs") else ",".join(item["record"].get("langs")) 541 542 # Missing references 543 if any([ref for ref in image_references if ref not in "".join(embeded_images)]): 544 unmapped_data.append({"loc": "missing_image_refs", "obj": [ref for ref in image_references if ref not in "".join(embeded_images)]}) 545 if quoted_ref: 546 if not quoted_link or (quoted_link not in ["DELETED", "REMOVED BY AUTHOR", "VIEWER BLOCKED BY AUTHOR"] and quoted_ref.split('/')[-1] not in quoted_link): 547 unmapped_data.append({"loc": "missing_quote_ref", "obj": quoted_ref}) 548 549 # Reference Posts (expanded to include handles during collection) 550 # None: handles may change; true DID from original object stored item["record"]["facets"]["features"] w/ "did" 551 # Mentions 552 mentions = [(mention.get("handle") if (mention.get("handle") and mention["handle"].lower() not in SearchBluesky.handle_lookup_error_messages) else mention.get("did")) for mention in item.get("4CAT_metadata", {}).get("mentions", [])] 553 # Reply to 554 replied_to_post = None 555 replied_to_user = None 556 if item["record"].get("reply"): 557 if item["4CAT_metadata"]["reply_to"] and item["4CAT_metadata"]["reply_to"].lower() not in SearchBluesky.handle_lookup_error_messages: 558 replied_to_user = item["4CAT_metadata"]["reply_to"] 559 else: 560 # Use DID, though this will not create a working link 561 replied_to_user = item["record"]["reply"]["parent"]["uri"].split("/")[2] 562 replied_to_post = SearchBluesky.get_bsky_link(replied_to_user, item["record"]["reply"]["parent"]["uri"].split("/")[-1]) 563 564 # These refer to slices of the text, but are also contained in the text or as an embed. If they are NOT also in the text and/or embed fields, then they are NOT displayed in bsky.app UI and thus only metadata 565 # if item["record"].get("entities"): 566 # unmapped_data.append({"loc": "record.entities", "obj": item["record"]["entities"]}) 567 568 # Author tags, not hashtags, not seen, very rarely used 569 # if item["record"].get("tags"): 570 # unmapped_data.append({"loc": "record.tags", "obj": item["record"].get("tags")}) 571 572 if unmapped_data: 573 # TODO: Use MappedItem message; currently it is not called... 574 config.with_db() 575 config.db.log.warning(f"Bluesky new mappings ({item['uri']}): {unmapped_data}") 576 577 return MappedItem({ 578 "collected_at": datetime.fromtimestamp(item["4CAT_metadata"]["collected_at"]).isoformat(), 579 "query": item["4CAT_metadata"]["query"], 580 "rank": item["4CAT_metadata"]["rank"], 581 "id": item["uri"], 582 "thread_id": item["record"]["reply"]["root"]["uri"] if item["record"].get("reply") else item["uri"], 583 "created_at": created_at.isoformat(), 584 "author": item["author"]["handle"], 585 "author_id": item["author"]["did"], 586 "body": item["record"]["text"], 587 "link": link, 588 "tags": ",".join(tags), 589 "like_count": item["like_count"], 590 "quote_count": item["quote_count"], 591 "reply_count": item["reply_count"], 592 "repost_count": item["repost_count"], 593 "quoted_post": quoted_link if quoted_link else "", 594 "quoted_user": quoted_user if quoted_user else "", 595 "replied_to_post": replied_to_post if replied_to_post else "", 596 "replied_to_user": replied_to_user if replied_to_user else "", 597 "replies_allowed": replies_allowed, 598 "mentions": ",".join(mentions), 599 "links": ",".join(embeded_links | links), 600 "images": ",".join(embeded_images), 601 "labels": ",".join(labels), 602 "has_poll": has_poll, 603 "languages": languages, 604 605 "author_display_name": item["author"]["display_name"], 606 "author_profile": author_profile, 607 "author_avatar": item["author"]["avatar"], 608 "author_created_at": SearchBluesky.bsky_convert_datetime_string(item["author"]["created_at"], mode="iso_string", raise_error=False), 609 610 "timestamp": created_at.timestamp(), 611 }, message=f"Bluesky new mappings: {unmapped_data}") 612 613 @staticmethod 614 def bsky_convert_datetime_string(datetime_string, mode="datetime", raise_error=True): 615 """ 616 Bluesky datetime string to datetime object. 617 618 Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string. 619 620 :param str datetime_string: The datetime string to convert 621 :param str mode: The mode to return the datetime object in [datetime, iso_string] 622 :param bool raise_error: Raise error if unable to parse else return datetime_string 623 :return datetime/str: The converted datetime object 624 """ 625 try: 626 datetime_object = parser.isoparse(datetime_string) 627 except ValueError as e: 628 if raise_error: 629 raise e 630 return datetime_string 631 632 if mode == "datetime": 633 return datetime_object 634 elif mode == "iso_string": 635 return datetime_object.isoformat() 636 637 638 @staticmethod 639 def get_bsky_link(handle, post_id): 640 """ 641 Get link to Bluesky post 642 """ 643 return f"https://bsky.app/profile/{handle}/post/{post_id}" 644 645 @staticmethod 646 def bsky_get_handle_from_did(client, did): 647 """ 648 Get handle from DID 649 """ 650 tries = 0 651 while True: 652 try: 653 user_profile = client.app.bsky.actor.get_profile({"actor": did}) 654 if user_profile: 655 return user_profile.handle 656 else: 657 return None 658 except InvokeTimeoutError as e: 659 # Network error; try again 660 tries += 1 661 time.sleep(1) 662 if tries > 3: 663 return None 664 continue 665 except BadRequestError as e: 666 if e.response.content.message: 667 return e.response.content.message 668 return None 669 670 @staticmethod 671 def bsky_login(username, password, session_id): 672 """ 673 Login to Bluesky 674 675 :param str username: Username for Bluesky 676 :param str password: Password for Bluesky 677 :param str session_id: Session ID to use for login 678 :return Client: Client object with login credentials 679 """ 680 if not session_id: 681 session_id = SearchBluesky.create_session_id(username, password) 682 elif (not username or not password) and not session_id: 683 raise ValueError("Must provide both username and password or else session_id.") 684 685 session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), "bsky_" + session_id + ".session") 686 687 def on_session_change(event: SessionEvent, session: Session) -> None: 688 """ 689 Save session to file; atproto session change event handler should keep the session up to date 690 691 https://atproto.blue/en/latest/atproto_client/auth.html 692 """ 693 print('Session changed:', event, repr(session)) 694 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 695 print('Saving changed session') 696 with session_path.open("w") as session_file: 697 session_file.write(session.export()) 698 699 client = Client() 700 client.on_session_change(on_session_change) 701 if session_path.exists(): 702 with session_path.open() as session_file: 703 session_string = session_file.read() 704 client.login(session_string=session_string) 705 else: 706 # Were not able to log in via session string; login with username and password 707 client.login(login=username, password=password) 708 return client 709 710 @staticmethod 711 def create_session_id(username, password): 712 """ 713 Generate a filename for the session file 714 715 This is a combination of username and password, but hashed 716 so that one cannot actually derive someone's information. 717 718 :param str username: Username for Bluesky 719 :param str password: Password for Bluesky 720 :return str: A hash value derived from the input 721 """ 722 hash_base = username.strip() + str(password).strip() 723 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
24class SearchBluesky(Search): 25 """ 26 Search for posts in Bluesky 27 """ 28 type = "bsky-search" # job ID 29 category = "Search" # category 30 title = "Bluesky Search" # title displayed in UI 31 description = "Collects Bluesky posts via its API." # description displayed in UI 32 extension = "ndjson" # extension of result file, used internally and in UI 33 is_local = False # Whether this datasource is locally scraped 34 is_static = False # Whether this datasource is still updated 35 36 config = { 37 "bsky-search.max_results": { 38 "type": UserInput.OPTION_TEXT, 39 "help": "Maximum results per query", 40 "coerce_type": int, 41 "min": 0, 42 "default": 50000, 43 "tooltip": "Amount of results (e.g., posts) per query. '0' will allow unlimited." 44 } 45 } 46 47 handle_lookup_error_messages = ['account is deactivated', "profile not found", "account has been suspended"] 48 49 @classmethod 50 def get_options(cls, parent_dataset=None, user=None): 51 """ 52 Get processor options 53 54 Just updates the description of the entities field based on the 55 configured max entities. 56 57 :param DataSet parent_dataset: An object representing the dataset that 58 the processor would be run on 59 :param User user: Flask user the options will be displayed for, in 60 case they are requested for display in the 4CAT web interface. This can 61 be used to show some options only to privileges users. 62 """ 63 options = { 64 "intro": { 65 "type": UserInput.OPTION_INFO, 66 "help": "Collects Bluesky posts via its API.\n\nYour login credentials will be sent to the 4CAT server " 67 "and stored there while data is fetched. After the dataset has been created your credentials " 68 "will be deleted from the server. \n[See tips and tricks on how to query Bluesky](https://bsky.social/about/blog/05-31-2024-search)." 69 }, 70 "username": { 71 "type": UserInput.OPTION_TEXT, 72 "help": "Bluesky Username", 73 "cache": True, 74 "sensitive": True, 75 }, 76 "password": { 77 "type": UserInput.OPTION_TEXT, 78 "help": "Bluesky Password", 79 "cache": True, # tells the frontend to cache this value 80 "sensitive": True, # tells the backend to delete this value after use 81 "password": True, # tells the frontend this is a password type 82 }, 83 "divider": { 84 "type": UserInput.OPTION_DIVIDER 85 }, 86 "query": { 87 "type": UserInput.OPTION_TEXT_LARGE, 88 "help": "Search Queries", 89 "tooltip": "Separate with commas or line breaks." 90 }, 91 "max_posts": { 92 "type": UserInput.OPTION_TEXT, 93 "help": "Max posts per query", 94 "min": 1, 95 "default": 100 96 }, 97 "daterange": { 98 "type": UserInput.OPTION_DATERANGE, 99 "help": "Date range", 100 "tooltip": "The date range for the search. No date range will search all posts." 101 }, 102 } 103 104 # Update the max_posts setting from config 105 max_posts = int(config.get('bsky-search.max_results', 100, user=user)) 106 if max_posts == 0: 107 # This is potentially madness 108 options["max_posts"]["tooltip"] = "Set to 0 to collect all posts." 109 options['max_posts']['min'] = 0 110 else: 111 options["max_posts"]["max"] = max_posts 112 options['max_posts']['default'] = options['max_posts']['default'] if options['max_posts']['default'] <= max_posts else max_posts 113 114 return options 115 116 @staticmethod 117 def validate_query(query, request, user): 118 """ 119 Validate Bluesky query 120 121 :param dict query: Query parameters, from client-side. 122 :param request: Flask request 123 :param User user: User object of user who has submitted the query 124 :return dict: Safe query parameters 125 """ 126 # no query 4 u 127 if not query.get("query", "").strip(): 128 raise QueryParametersException("You must provide a search query.") 129 130 if not query.get("username", None) or not query.get("password", None) : 131 raise QueryParametersException("You need to provide valid Bluesky login credentials first.") 132 133 # Test login credentials 134 session_id = SearchBluesky.create_session_id(query["username"], query["password"]) 135 try: 136 SearchBluesky.bsky_login(username=query["username"], password=query["password"], session_id=session_id) 137 except UnauthorizedError as e: 138 raise QueryParametersException("Invalid Bluesky login credentials.") 139 except RequestException as e: 140 if e.response.content.message == 'Rate Limit Exceeded': 141 lifted_at = datetime.fromtimestamp(int(e.response.headers["ratelimit-reset"])) 142 raise QueryParametersException(f"Bluesky rate limit exceeded. Try again after {lifted_at.strftime('%Y-%m-%d %H:%M:%S')}.") 143 else: 144 raise QueryParametersException(f"Bluesky login failed. {e.response.content.message}") 145 146 # sanitize query 147 sanitized_query = [q.strip() for q in query.get("query").replace("\n", ",").split(",") if q.strip()] 148 149 # the dates need to make sense as a range to search within 150 min_date, max_date = query.get("daterange") 151 if min_date and max_date and min_date > max_date: 152 raise QueryParametersException("The start date must be before the end date.") 153 154 # Only check this if not already confirmed by the frontend 155 posts_per_second = 55 # gathered from simply checking start/end times of logs 156 if not query.get("frontend-confirm"): 157 # Estimate is not returned; use max_posts as a rough estimate 158 max_posts = query.get("max_posts", 100) 159 expected_tweets = query.get("max_posts", 100) * len(sanitized_query) 160 # Warn if process may take more than ~1 hours 161 if expected_tweets > (posts_per_second * 3600): 162 expected_time = timify_long(expected_tweets / posts_per_second) 163 raise QueryNeedsExplicitConfirmationException(f"This query matches approximately {expected_tweets} tweets and may take {expected_time} to complete. Do you want to continue?") 164 elif max_posts == 0 and not min_date: 165 raise QueryNeedsExplicitConfirmationException(f"No maximum number of posts set! This query has no minimum date and thus may take a very, very long time to complete. Do you want to continue?") 166 elif max_posts == 0: 167 raise QueryNeedsExplicitConfirmationException(f"No maximum number of posts set! This query may take a long time to complete. Do you want to continue?") 168 169 return { 170 "max_posts": query.get("max_posts"), 171 "query": ",".join(sanitized_query), 172 "username": query.get("username"), 173 "password": query.get("password"), 174 "session_id": session_id, 175 "min_date": min_date, 176 "max_date": max_date, 177 } 178 179 def get_items(self, query): 180 """ 181 Execute a query; get messages for given parameters 182 183 Basically a wrapper around execute_queries() to call it with asyncio. 184 185 :param dict query: Query parameters, as part of the DataSet object 186 :return list: Posts, sorted by thread and post ID, in ascending order 187 """ 188 if not query.get("session_id") and (not query.get("username") or not query.get("password")): 189 return self.dataset.finish_with_error("Your Bluesky login credentials are no longer available in 4CAT; please re-create this datasource.") 190 191 session_id = SearchBluesky.create_session_id(query.get("username"), query.get("password")) if not query.get("session_id") else query["session_id"] 192 try: 193 client = SearchBluesky.bsky_login(username=query.get("username"), password=query.get("password"), session_id=session_id) 194 except (UnauthorizedError, RequestException, BadRequestError) as e: 195 self.dataset.log(f"Bluesky login failed: {e}") 196 return self.dataset.finish_with_error("Bluesky login failed; please re-create this datasource.") 197 198 self.dataset.update_status(f"Collecting posts from Bluesky as {client.me.handle}") 199 200 max_posts = query.get("max_posts", 100) 201 limit = 100 if (max_posts > 100 or max_posts == 0) else max_posts 202 203 # Handle reference mapping; user references use did instead of dynamic handle 204 did_to_handle = {} 205 206 query_parameters = { 207 "limit": limit, 208 } 209 210 # Add start and end dates if provided 211 if self.parameters.get("min_date"): 212 query_parameters['since'] = datetime.fromtimestamp(self.parameters.get("min_date")).strftime('%Y-%m-%dT%H:%M:%SZ') 213 if self.parameters.get("max_date"): 214 query_parameters['until'] = datetime.fromtimestamp(self.parameters.get("max_date")).strftime('%Y-%m-%dT%H:%M:%SZ') 215 216 queries = query.get("query").split(",") 217 num_queries = len(queries) 218 total_posts = 0 219 i = 0 220 last_query = None 221 last_date = None 222 while queries: 223 query = queries.pop(0) 224 if query == last_query: 225 # Check if there are continued posts from the last query 226 query_parameters['until'] = last_date.strftime('%Y-%m-%dT%H:%M:%SZ') 227 self.dataset.log(f"Continuing query ({i} of {num_queries}): {query} from {last_date.strftime('%Y-%m-%dT%H:%M:%SZ')}") 228 else: 229 # New query 230 query_post_ids = set() 231 i += 1 232 rank = 0 233 last_query = query 234 last_date = None 235 self.dataset.update_status(f"Collecting query ({i} of {num_queries}): {query}") 236 query_requests = 0 237 238 query_parameters["q"] = query 239 cursor = None # Start with no cursor (first page) 240 search_for_invalid_post = False 241 invalid_post_counter = 0 242 while True: 243 if self.interrupted: 244 raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API") 245 # Query posts, including pagination (cursor for next page) 246 tries = 0 247 response = None 248 while tries < 3: 249 query_parameters["cursor"] = cursor 250 try: 251 response = client.app.bsky.feed.search_posts(params=query_parameters) 252 break 253 except ModelError as e: 254 # Post validation error; one post is unable to be read 255 # Pattern: some invalid post raises error, we switch from higher limit (usually 100) to 1 in 256 # order to collect post by post, invalid post is identified again, we switch back to higher 257 # limit and continue as normal, at the "end" of a cursor/query life (~10k posts) a NetworkError 258 # is raised with detail refering to a server error 502 InternalServerError, we catch that and 259 # add the query back to the queue with a new "until" date to continue the query 260 # https://github.com/bluesky-social/atproto/issues/3446 261 if not search_for_invalid_post: 262 # New invalid post, search and skip 263 self.dataset.log(f"Invalid post detected; searching post by post: {e}") 264 search_for_invalid_post = True 265 # Currently we must search post by post to find the invalid post 266 query_parameters["limit"] = 1 267 else: 268 # Found invalid post, skip, reset counters 269 self.dataset.log( 270 f"Invalid post identified; skipping and continue with query as normal: {e}") 271 search_for_invalid_post = False 272 # Reset limit to normal 273 query_parameters["limit"] = limit 274 invalid_post_counter = 0 275 cursor = str(int(cursor) + 1) if cursor else None 276 # Re-query with new cursor & limit 277 continue 278 279 except InvokeTimeoutError as e: 280 # Timeout error, but can occur for odd queries with no results 281 self.dataset.log(f"Bluesky request error for query {query}: {e}") 282 time.sleep(1) 283 tries += 2 284 continue 285 except NetworkError as e: 286 # 502 InternalServerError: occurs if switch limits in a "set" (i.e. the vague 10k posts cursor limit), I seem to get this error around the 10k mark instead of just a missing cursor as normal 287 self.dataset.log(f"Bluesky network error for query {query}; retrying: {e}") 288 time.sleep(1 + (tries * 10)) 289 queries.insert(0, query) 290 break 291 292 if not response: 293 # Expected from NetworkError, but the query will have been added back to the queue 294 # If not, then there was a problem with the query 295 if len(queries) == 0: 296 self.dataset.update_status(f"Error collecting posts from Bluesky; see log for details", is_final=True) 297 if query not in queries: 298 # Query was not added back; there was an unexpected issue with the query itself 299 self.dataset.update_status(f"Error continuing {query} from Bluesky (see log for details); continuing to next query") 300 break 301 302 query_requests += 1 303 items = response['posts'] if hasattr(response, 'posts') else [] 304 305 if search_for_invalid_post: 306 invalid_post_counter += 1 307 if invalid_post_counter >= 100: 308 # Max limit is 100; this should not occur, but we do not want to continue searching post by post indefinitely 309 self.dataset.log(f"Unable to identify invalid post; discontinuing search") 310 query_parameters["limit"] = limit 311 search_for_invalid_post = False 312 invalid_post_counter = 0 313 314 if not items: 315 # Sometimes no post is returned, but there still may be posts following 316 self.dataset.log(f"Query {query} w/ params {query_parameters} returned no posts: {response}") 317 # TODO: this is odd; no information is returned as to why that one item is not returned and no error is raised 318 cursor = str(int(cursor) + 1) if cursor else None 319 continue 320 321 new_posts = 0 322 # Handle the posts 323 for item in items: 324 if 0 < max_posts <= rank: 325 break 326 327 if self.interrupted: 328 raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API") 329 330 post = item.model_dump() 331 post_id = post["uri"] 332 # Queries use the indexed_at date for time-based pagination (as opposed to created_at); used to continue query if needed 333 last_date = SearchBluesky.bsky_convert_datetime_string(post.get("indexed_at")) 334 if post_id in query_post_ids: 335 # Skip duplicate posts 336 continue 337 338 new_posts += 1 339 query_post_ids.add(post_id) 340 341 # Add user handles from references 342 did_to_handle[post["author"]["did"]] = post["author"]["handle"] 343 # Mentions 344 mentions = [] 345 if post["record"].get("facets"): 346 for facet in post["record"]["facets"]: 347 for feature in facet.get("features", {}): 348 if feature.get("did"): 349 if feature["did"] in did_to_handle: 350 mentions.append({"did": feature["did"], "handle": did_to_handle[feature["did"]]}) 351 else: 352 handle = SearchBluesky.bsky_get_handle_from_did(client, feature["did"]) 353 if handle: 354 if handle.lower() in self.handle_lookup_error_messages: 355 self.dataset.log(f"Bluesky: user ({feature['did']}) {handle}") 356 mentions.append({"did": feature["did"], "handle": handle}) 357 did_to_handle[feature["did"]] = handle 358 else: 359 mentions.append({"did": feature["did"], "handle": None}) 360 self.dataset.log(f"Bluesky: could not lookup the handle for {feature['did']}") 361 # Reply to 362 reply_to_handle = None 363 if post["record"].get("reply"): 364 reply_to_did = post["record"]["reply"]["parent"]["uri"].split("/")[2] 365 if reply_to_did in did_to_handle: 366 reply_to_handle = did_to_handle[reply_to_did] 367 else: 368 handle = SearchBluesky.bsky_get_handle_from_did(client, reply_to_did) 369 if handle: 370 if handle.lower() in self.handle_lookup_error_messages: 371 self.dataset.log(f"Bluesky: user ({reply_to_did}) {handle}") 372 reply_to_handle = handle 373 did_to_handle[reply_to_did] = handle 374 else: 375 self.dataset.log(f"Bluesky: could not find handle for {reply_to_did}") 376 377 378 post.update({"4CAT_metadata": { 379 "collected_at": datetime.now().timestamp(), 380 "query": query, 381 "rank": rank, 382 "mentions": mentions, 383 "reply_to": reply_to_handle if reply_to_handle else None, 384 }}) 385 rank += 1 386 yield post 387 total_posts += 1 388 389 # Check if there is a cursor for the next page 390 cursor = response['cursor'] 391 if max_posts != 0: 392 self.dataset.update_progress(total_posts / (max_posts * num_queries)) 393 394 if 0 < max_posts <= rank: 395 self.dataset.update_status( 396 f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}") 397 break 398 399 if not cursor: 400 if new_posts: 401 # Bluesky API seems to stop around 10000 posts and not return a cursor 402 # Re-query with the same query to get the next set of posts using last_date (set above) 403 self.dataset.log(f"Query {query}: {query_requests} requests") 404 queries.insert(0, query) 405 else: 406 # No new posts; if we have not hit the max_posts, but no new posts are being returned, then we are done 407 self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned") 408 409 if rank: 410 self.dataset.update_status(f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}") 411 break # No more pages, stop the loop 412 elif not items: 413 self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned") 414 break 415 416 @staticmethod 417 def map_item(item): 418 """ 419 Convert item object to 4CAT-ready data object 420 421 :param dict item: item to parse 422 :return dict: 4CAT-compatible item object 423 """ 424 unmapped_data = [] 425 426 # Add link to post; this is a Bluesky-specific URL and may not always be accurate 427 link = SearchBluesky.get_bsky_link(item['author']['handle'], item['uri'].split('/')[-1]) 428 author_profile = f"https://bsky.app/profile/{item['author']['handle']}" 429 430 created_at = SearchBluesky.bsky_convert_datetime_string(item["record"].get("created_at",item["record"].get("createdAt"))) 431 432 # Tags 433 tags = set() 434 links = set() 435 mentions_did = set() 436 has_poll = False 437 if item["record"].get("facets"): 438 for facet in item["record"].get("facets"): 439 for feature in facet.get("features"): 440 if feature.get("tag"): 441 tags.add(feature.get("tag")) 442 elif feature.get("uri"): 443 links.add(feature.get("uri")) 444 elif feature.get("did"): 445 mentions_did.add(feature.get("did")) 446 elif feature.get("number"): 447 has_poll = True 448 else: 449 unmapped_data.append({"loc": "record.facets.features", "obj": feature}) 450 if "features" not in facet: 451 unmapped_data.append({"loc": "record.facets", "obj": facet}) 452 453 # Embeds are in both the item and the record; so far these always contain same item 454 embeded_links = set() 455 embeded_images = set() 456 image_references = set() 457 quoted_link = None 458 quoted_user = None 459 quoted_ref = None 460 possible_embeds = [item.get("embed", {}), item["record"].get("embed", {})] 461 while possible_embeds: 462 embed = possible_embeds.pop(0) 463 if not embed: 464 continue 465 466 py_type = embed.pop("py_type") if "py_type" in embed else (embed.pop("$type") if "$type" in embed else None) 467 if py_type in ["app.bsky.embed.recordWithMedia#view", "app.bsky.embed.recordWithMedia"]: 468 # contains post plus additional media 469 for key, media_ob in embed.items(): 470 possible_embeds.append(media_ob) 471 472 elif "images" in embed: # py_type in ["app.bsky.embed.images#view", "app.bsky.embed.images", "app.bsky.embed.images#main"] 473 for img_ob in embed["images"]: 474 img_link = img_ob.get("fullsize", img_ob.get("thumb")) 475 if img_link: 476 embeded_images.add(img_link) 477 elif img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")): 478 # ob.get("image").get("ref").get("link") will have a reference that could be linked via API 479 # BUT ref has already been obtained in other embeds... 480 image_references.add(img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link"))) 481 else: 482 unmapped_data.append({"loc": "embed.images", "obj": img_ob}) 483 elif py_type in ["app.bsky.embed.video#view", "app.bsky.embed.video"]: 484 # Does not appear to be direct video links, just thumbnail (Bluesky API may be able to get more) 485 if embed.get("thumbnail"): 486 embeded_images.add(embed.get("thumbnail")) 487 elif embed.get("video", {}).get("ref", {}).get("link"): 488 image_references.add(embed.get("video", {}).get("ref", {}).get("link")) 489 else: 490 # No thumb for video 491 pass 492 elif "record" in embed: # py_type in ["app.bsky.embed.record#view", "app.bsky.embed.record"] 493 # Quoted post 494 # Note: these may also contain images that would be seen, but are not part of the original post 495 if embed["record"].get("author", embed["record"].get("creator")): 496 if "handle" not in embed["record"].get("author", embed["record"].get("creator")): 497 # User may not be able to see original post 498 if "app.bsky.feed.defs#blockedAuthor" == embed["record"].get("author", embed["record"].get("creator"))["py_type"]: 499 quoted_link = "VIEWER BLOCKED BY AUTHOR" 500 else: 501 # New unknown 502 unmapped_data.append({"loc": "embed.record.author", "obj": embed["record"].get("author", embed["record"].get("creator"))}) 503 else: 504 # author seems to be a quoted post while creator a quoted "list" 505 quoted_user = embed["record"].get("author", embed["record"].get("creator"))["handle"] 506 quoted_link = SearchBluesky.get_bsky_link(quoted_user, embed['record']['uri'].split('/')[-1]) 507 elif embed["record"].get("not_found"): 508 quoted_link = "DELETED" 509 # We do have the DID, but this information is not normally displayed 510 # quoted_user = embed["record"]['uri'].split("/")[2] 511 elif embed["record"].get("detached"): 512 quoted_link = "REMOVED BY AUTHOR" 513 else: 514 quoted_ref = embed["record"]['uri'] 515 elif "external" in embed: # py_type in ["app.bsky.embed.external#view", "app.bsky.embed.external"] 516 if embed["external"].get("uri"): 517 embeded_links.add(embed["external"].get("uri")) 518 if embed["external"].get("thumb"): 519 if isinstance(embed["external"]["thumb"], str): 520 embeded_images.add(embed["external"]["thumb"]) 521 else: 522 image_references.add(embed["external"]["thumb"].get("ref", {}).get("link", "")) 523 else: 524 # No thumb for link 525 pass 526 else: 527 unmapped_data.append({"loc": f"embed.{py_type}", 528 "obj": embed}) 529 530 # Replies allowed 531 # https://docs.bsky.app/docs/tutorials/thread-gates 532 # threadgate object does not appear to differentiate between the types of replies allowed 533 replies_allowed = True if not item["threadgate"] else False 534 535 # Labels (content moderation) 536 labels = set() if not item["labels"] else set([label.get("val") for label in item["labels"]]) 537 if item["record"].get("labels"): 538 labels = labels | set([label.get("val") for label in item["record"]["labels"].get("values",[])]) 539 540 # Language 541 languages = "N/A" if not item["record"].get("langs") else ",".join(item["record"].get("langs")) 542 543 # Missing references 544 if any([ref for ref in image_references if ref not in "".join(embeded_images)]): 545 unmapped_data.append({"loc": "missing_image_refs", "obj": [ref for ref in image_references if ref not in "".join(embeded_images)]}) 546 if quoted_ref: 547 if not quoted_link or (quoted_link not in ["DELETED", "REMOVED BY AUTHOR", "VIEWER BLOCKED BY AUTHOR"] and quoted_ref.split('/')[-1] not in quoted_link): 548 unmapped_data.append({"loc": "missing_quote_ref", "obj": quoted_ref}) 549 550 # Reference Posts (expanded to include handles during collection) 551 # None: handles may change; true DID from original object stored item["record"]["facets"]["features"] w/ "did" 552 # Mentions 553 mentions = [(mention.get("handle") if (mention.get("handle") and mention["handle"].lower() not in SearchBluesky.handle_lookup_error_messages) else mention.get("did")) for mention in item.get("4CAT_metadata", {}).get("mentions", [])] 554 # Reply to 555 replied_to_post = None 556 replied_to_user = None 557 if item["record"].get("reply"): 558 if item["4CAT_metadata"]["reply_to"] and item["4CAT_metadata"]["reply_to"].lower() not in SearchBluesky.handle_lookup_error_messages: 559 replied_to_user = item["4CAT_metadata"]["reply_to"] 560 else: 561 # Use DID, though this will not create a working link 562 replied_to_user = item["record"]["reply"]["parent"]["uri"].split("/")[2] 563 replied_to_post = SearchBluesky.get_bsky_link(replied_to_user, item["record"]["reply"]["parent"]["uri"].split("/")[-1]) 564 565 # These refer to slices of the text, but are also contained in the text or as an embed. If they are NOT also in the text and/or embed fields, then they are NOT displayed in bsky.app UI and thus only metadata 566 # if item["record"].get("entities"): 567 # unmapped_data.append({"loc": "record.entities", "obj": item["record"]["entities"]}) 568 569 # Author tags, not hashtags, not seen, very rarely used 570 # if item["record"].get("tags"): 571 # unmapped_data.append({"loc": "record.tags", "obj": item["record"].get("tags")}) 572 573 if unmapped_data: 574 # TODO: Use MappedItem message; currently it is not called... 575 config.with_db() 576 config.db.log.warning(f"Bluesky new mappings ({item['uri']}): {unmapped_data}") 577 578 return MappedItem({ 579 "collected_at": datetime.fromtimestamp(item["4CAT_metadata"]["collected_at"]).isoformat(), 580 "query": item["4CAT_metadata"]["query"], 581 "rank": item["4CAT_metadata"]["rank"], 582 "id": item["uri"], 583 "thread_id": item["record"]["reply"]["root"]["uri"] if item["record"].get("reply") else item["uri"], 584 "created_at": created_at.isoformat(), 585 "author": item["author"]["handle"], 586 "author_id": item["author"]["did"], 587 "body": item["record"]["text"], 588 "link": link, 589 "tags": ",".join(tags), 590 "like_count": item["like_count"], 591 "quote_count": item["quote_count"], 592 "reply_count": item["reply_count"], 593 "repost_count": item["repost_count"], 594 "quoted_post": quoted_link if quoted_link else "", 595 "quoted_user": quoted_user if quoted_user else "", 596 "replied_to_post": replied_to_post if replied_to_post else "", 597 "replied_to_user": replied_to_user if replied_to_user else "", 598 "replies_allowed": replies_allowed, 599 "mentions": ",".join(mentions), 600 "links": ",".join(embeded_links | links), 601 "images": ",".join(embeded_images), 602 "labels": ",".join(labels), 603 "has_poll": has_poll, 604 "languages": languages, 605 606 "author_display_name": item["author"]["display_name"], 607 "author_profile": author_profile, 608 "author_avatar": item["author"]["avatar"], 609 "author_created_at": SearchBluesky.bsky_convert_datetime_string(item["author"]["created_at"], mode="iso_string", raise_error=False), 610 611 "timestamp": created_at.timestamp(), 612 }, message=f"Bluesky new mappings: {unmapped_data}") 613 614 @staticmethod 615 def bsky_convert_datetime_string(datetime_string, mode="datetime", raise_error=True): 616 """ 617 Bluesky datetime string to datetime object. 618 619 Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string. 620 621 :param str datetime_string: The datetime string to convert 622 :param str mode: The mode to return the datetime object in [datetime, iso_string] 623 :param bool raise_error: Raise error if unable to parse else return datetime_string 624 :return datetime/str: The converted datetime object 625 """ 626 try: 627 datetime_object = parser.isoparse(datetime_string) 628 except ValueError as e: 629 if raise_error: 630 raise e 631 return datetime_string 632 633 if mode == "datetime": 634 return datetime_object 635 elif mode == "iso_string": 636 return datetime_object.isoformat() 637 638 639 @staticmethod 640 def get_bsky_link(handle, post_id): 641 """ 642 Get link to Bluesky post 643 """ 644 return f"https://bsky.app/profile/{handle}/post/{post_id}" 645 646 @staticmethod 647 def bsky_get_handle_from_did(client, did): 648 """ 649 Get handle from DID 650 """ 651 tries = 0 652 while True: 653 try: 654 user_profile = client.app.bsky.actor.get_profile({"actor": did}) 655 if user_profile: 656 return user_profile.handle 657 else: 658 return None 659 except InvokeTimeoutError as e: 660 # Network error; try again 661 tries += 1 662 time.sleep(1) 663 if tries > 3: 664 return None 665 continue 666 except BadRequestError as e: 667 if e.response.content.message: 668 return e.response.content.message 669 return None 670 671 @staticmethod 672 def bsky_login(username, password, session_id): 673 """ 674 Login to Bluesky 675 676 :param str username: Username for Bluesky 677 :param str password: Password for Bluesky 678 :param str session_id: Session ID to use for login 679 :return Client: Client object with login credentials 680 """ 681 if not session_id: 682 session_id = SearchBluesky.create_session_id(username, password) 683 elif (not username or not password) and not session_id: 684 raise ValueError("Must provide both username and password or else session_id.") 685 686 session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), "bsky_" + session_id + ".session") 687 688 def on_session_change(event: SessionEvent, session: Session) -> None: 689 """ 690 Save session to file; atproto session change event handler should keep the session up to date 691 692 https://atproto.blue/en/latest/atproto_client/auth.html 693 """ 694 print('Session changed:', event, repr(session)) 695 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 696 print('Saving changed session') 697 with session_path.open("w") as session_file: 698 session_file.write(session.export()) 699 700 client = Client() 701 client.on_session_change(on_session_change) 702 if session_path.exists(): 703 with session_path.open() as session_file: 704 session_string = session_file.read() 705 client.login(session_string=session_string) 706 else: 707 # Were not able to log in via session string; login with username and password 708 client.login(login=username, password=password) 709 return client 710 711 @staticmethod 712 def create_session_id(username, password): 713 """ 714 Generate a filename for the session file 715 716 This is a combination of username and password, but hashed 717 so that one cannot actually derive someone's information. 718 719 :param str username: Username for Bluesky 720 :param str password: Password for Bluesky 721 :return str: A hash value derived from the input 722 """ 723 hash_base = username.strip() + str(password).strip() 724 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
Search for posts in Bluesky
49 @classmethod 50 def get_options(cls, parent_dataset=None, user=None): 51 """ 52 Get processor options 53 54 Just updates the description of the entities field based on the 55 configured max entities. 56 57 :param DataSet parent_dataset: An object representing the dataset that 58 the processor would be run on 59 :param User user: Flask user the options will be displayed for, in 60 case they are requested for display in the 4CAT web interface. This can 61 be used to show some options only to privileges users. 62 """ 63 options = { 64 "intro": { 65 "type": UserInput.OPTION_INFO, 66 "help": "Collects Bluesky posts via its API.\n\nYour login credentials will be sent to the 4CAT server " 67 "and stored there while data is fetched. After the dataset has been created your credentials " 68 "will be deleted from the server. \n[See tips and tricks on how to query Bluesky](https://bsky.social/about/blog/05-31-2024-search)." 69 }, 70 "username": { 71 "type": UserInput.OPTION_TEXT, 72 "help": "Bluesky Username", 73 "cache": True, 74 "sensitive": True, 75 }, 76 "password": { 77 "type": UserInput.OPTION_TEXT, 78 "help": "Bluesky Password", 79 "cache": True, # tells the frontend to cache this value 80 "sensitive": True, # tells the backend to delete this value after use 81 "password": True, # tells the frontend this is a password type 82 }, 83 "divider": { 84 "type": UserInput.OPTION_DIVIDER 85 }, 86 "query": { 87 "type": UserInput.OPTION_TEXT_LARGE, 88 "help": "Search Queries", 89 "tooltip": "Separate with commas or line breaks." 90 }, 91 "max_posts": { 92 "type": UserInput.OPTION_TEXT, 93 "help": "Max posts per query", 94 "min": 1, 95 "default": 100 96 }, 97 "daterange": { 98 "type": UserInput.OPTION_DATERANGE, 99 "help": "Date range", 100 "tooltip": "The date range for the search. No date range will search all posts." 101 }, 102 } 103 104 # Update the max_posts setting from config 105 max_posts = int(config.get('bsky-search.max_results', 100, user=user)) 106 if max_posts == 0: 107 # This is potentially madness 108 options["max_posts"]["tooltip"] = "Set to 0 to collect all posts." 109 options['max_posts']['min'] = 0 110 else: 111 options["max_posts"]["max"] = max_posts 112 options['max_posts']['default'] = options['max_posts']['default'] if options['max_posts']['default'] <= max_posts else max_posts 113 114 return options
Get processor options
Just updates the description of the entities field based on the configured max entities.
Parameters
- DataSet parent_dataset: An object representing the dataset that the processor would be run on
- User user: Flask user the options will be displayed for, in case they are requested for display in the 4CAT web interface. This can be used to show some options only to privileges users.
116 @staticmethod 117 def validate_query(query, request, user): 118 """ 119 Validate Bluesky query 120 121 :param dict query: Query parameters, from client-side. 122 :param request: Flask request 123 :param User user: User object of user who has submitted the query 124 :return dict: Safe query parameters 125 """ 126 # no query 4 u 127 if not query.get("query", "").strip(): 128 raise QueryParametersException("You must provide a search query.") 129 130 if not query.get("username", None) or not query.get("password", None) : 131 raise QueryParametersException("You need to provide valid Bluesky login credentials first.") 132 133 # Test login credentials 134 session_id = SearchBluesky.create_session_id(query["username"], query["password"]) 135 try: 136 SearchBluesky.bsky_login(username=query["username"], password=query["password"], session_id=session_id) 137 except UnauthorizedError as e: 138 raise QueryParametersException("Invalid Bluesky login credentials.") 139 except RequestException as e: 140 if e.response.content.message == 'Rate Limit Exceeded': 141 lifted_at = datetime.fromtimestamp(int(e.response.headers["ratelimit-reset"])) 142 raise QueryParametersException(f"Bluesky rate limit exceeded. Try again after {lifted_at.strftime('%Y-%m-%d %H:%M:%S')}.") 143 else: 144 raise QueryParametersException(f"Bluesky login failed. {e.response.content.message}") 145 146 # sanitize query 147 sanitized_query = [q.strip() for q in query.get("query").replace("\n", ",").split(",") if q.strip()] 148 149 # the dates need to make sense as a range to search within 150 min_date, max_date = query.get("daterange") 151 if min_date and max_date and min_date > max_date: 152 raise QueryParametersException("The start date must be before the end date.") 153 154 # Only check this if not already confirmed by the frontend 155 posts_per_second = 55 # gathered from simply checking start/end times of logs 156 if not query.get("frontend-confirm"): 157 # Estimate is not returned; use max_posts as a rough estimate 158 max_posts = query.get("max_posts", 100) 159 expected_tweets = query.get("max_posts", 100) * len(sanitized_query) 160 # Warn if process may take more than ~1 hours 161 if expected_tweets > (posts_per_second * 3600): 162 expected_time = timify_long(expected_tweets / posts_per_second) 163 raise QueryNeedsExplicitConfirmationException(f"This query matches approximately {expected_tweets} tweets and may take {expected_time} to complete. Do you want to continue?") 164 elif max_posts == 0 and not min_date: 165 raise QueryNeedsExplicitConfirmationException(f"No maximum number of posts set! This query has no minimum date and thus may take a very, very long time to complete. Do you want to continue?") 166 elif max_posts == 0: 167 raise QueryNeedsExplicitConfirmationException(f"No maximum number of posts set! This query may take a long time to complete. Do you want to continue?") 168 169 return { 170 "max_posts": query.get("max_posts"), 171 "query": ",".join(sanitized_query), 172 "username": query.get("username"), 173 "password": query.get("password"), 174 "session_id": session_id, 175 "min_date": min_date, 176 "max_date": max_date, 177 }
Validate Bluesky query
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- User user: User object of user who has submitted the query
Returns
Safe query parameters
179 def get_items(self, query): 180 """ 181 Execute a query; get messages for given parameters 182 183 Basically a wrapper around execute_queries() to call it with asyncio. 184 185 :param dict query: Query parameters, as part of the DataSet object 186 :return list: Posts, sorted by thread and post ID, in ascending order 187 """ 188 if not query.get("session_id") and (not query.get("username") or not query.get("password")): 189 return self.dataset.finish_with_error("Your Bluesky login credentials are no longer available in 4CAT; please re-create this datasource.") 190 191 session_id = SearchBluesky.create_session_id(query.get("username"), query.get("password")) if not query.get("session_id") else query["session_id"] 192 try: 193 client = SearchBluesky.bsky_login(username=query.get("username"), password=query.get("password"), session_id=session_id) 194 except (UnauthorizedError, RequestException, BadRequestError) as e: 195 self.dataset.log(f"Bluesky login failed: {e}") 196 return self.dataset.finish_with_error("Bluesky login failed; please re-create this datasource.") 197 198 self.dataset.update_status(f"Collecting posts from Bluesky as {client.me.handle}") 199 200 max_posts = query.get("max_posts", 100) 201 limit = 100 if (max_posts > 100 or max_posts == 0) else max_posts 202 203 # Handle reference mapping; user references use did instead of dynamic handle 204 did_to_handle = {} 205 206 query_parameters = { 207 "limit": limit, 208 } 209 210 # Add start and end dates if provided 211 if self.parameters.get("min_date"): 212 query_parameters['since'] = datetime.fromtimestamp(self.parameters.get("min_date")).strftime('%Y-%m-%dT%H:%M:%SZ') 213 if self.parameters.get("max_date"): 214 query_parameters['until'] = datetime.fromtimestamp(self.parameters.get("max_date")).strftime('%Y-%m-%dT%H:%M:%SZ') 215 216 queries = query.get("query").split(",") 217 num_queries = len(queries) 218 total_posts = 0 219 i = 0 220 last_query = None 221 last_date = None 222 while queries: 223 query = queries.pop(0) 224 if query == last_query: 225 # Check if there are continued posts from the last query 226 query_parameters['until'] = last_date.strftime('%Y-%m-%dT%H:%M:%SZ') 227 self.dataset.log(f"Continuing query ({i} of {num_queries}): {query} from {last_date.strftime('%Y-%m-%dT%H:%M:%SZ')}") 228 else: 229 # New query 230 query_post_ids = set() 231 i += 1 232 rank = 0 233 last_query = query 234 last_date = None 235 self.dataset.update_status(f"Collecting query ({i} of {num_queries}): {query}") 236 query_requests = 0 237 238 query_parameters["q"] = query 239 cursor = None # Start with no cursor (first page) 240 search_for_invalid_post = False 241 invalid_post_counter = 0 242 while True: 243 if self.interrupted: 244 raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API") 245 # Query posts, including pagination (cursor for next page) 246 tries = 0 247 response = None 248 while tries < 3: 249 query_parameters["cursor"] = cursor 250 try: 251 response = client.app.bsky.feed.search_posts(params=query_parameters) 252 break 253 except ModelError as e: 254 # Post validation error; one post is unable to be read 255 # Pattern: some invalid post raises error, we switch from higher limit (usually 100) to 1 in 256 # order to collect post by post, invalid post is identified again, we switch back to higher 257 # limit and continue as normal, at the "end" of a cursor/query life (~10k posts) a NetworkError 258 # is raised with detail refering to a server error 502 InternalServerError, we catch that and 259 # add the query back to the queue with a new "until" date to continue the query 260 # https://github.com/bluesky-social/atproto/issues/3446 261 if not search_for_invalid_post: 262 # New invalid post, search and skip 263 self.dataset.log(f"Invalid post detected; searching post by post: {e}") 264 search_for_invalid_post = True 265 # Currently we must search post by post to find the invalid post 266 query_parameters["limit"] = 1 267 else: 268 # Found invalid post, skip, reset counters 269 self.dataset.log( 270 f"Invalid post identified; skipping and continue with query as normal: {e}") 271 search_for_invalid_post = False 272 # Reset limit to normal 273 query_parameters["limit"] = limit 274 invalid_post_counter = 0 275 cursor = str(int(cursor) + 1) if cursor else None 276 # Re-query with new cursor & limit 277 continue 278 279 except InvokeTimeoutError as e: 280 # Timeout error, but can occur for odd queries with no results 281 self.dataset.log(f"Bluesky request error for query {query}: {e}") 282 time.sleep(1) 283 tries += 2 284 continue 285 except NetworkError as e: 286 # 502 InternalServerError: occurs if switch limits in a "set" (i.e. the vague 10k posts cursor limit), I seem to get this error around the 10k mark instead of just a missing cursor as normal 287 self.dataset.log(f"Bluesky network error for query {query}; retrying: {e}") 288 time.sleep(1 + (tries * 10)) 289 queries.insert(0, query) 290 break 291 292 if not response: 293 # Expected from NetworkError, but the query will have been added back to the queue 294 # If not, then there was a problem with the query 295 if len(queries) == 0: 296 self.dataset.update_status(f"Error collecting posts from Bluesky; see log for details", is_final=True) 297 if query not in queries: 298 # Query was not added back; there was an unexpected issue with the query itself 299 self.dataset.update_status(f"Error continuing {query} from Bluesky (see log for details); continuing to next query") 300 break 301 302 query_requests += 1 303 items = response['posts'] if hasattr(response, 'posts') else [] 304 305 if search_for_invalid_post: 306 invalid_post_counter += 1 307 if invalid_post_counter >= 100: 308 # Max limit is 100; this should not occur, but we do not want to continue searching post by post indefinitely 309 self.dataset.log(f"Unable to identify invalid post; discontinuing search") 310 query_parameters["limit"] = limit 311 search_for_invalid_post = False 312 invalid_post_counter = 0 313 314 if not items: 315 # Sometimes no post is returned, but there still may be posts following 316 self.dataset.log(f"Query {query} w/ params {query_parameters} returned no posts: {response}") 317 # TODO: this is odd; no information is returned as to why that one item is not returned and no error is raised 318 cursor = str(int(cursor) + 1) if cursor else None 319 continue 320 321 new_posts = 0 322 # Handle the posts 323 for item in items: 324 if 0 < max_posts <= rank: 325 break 326 327 if self.interrupted: 328 raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API") 329 330 post = item.model_dump() 331 post_id = post["uri"] 332 # Queries use the indexed_at date for time-based pagination (as opposed to created_at); used to continue query if needed 333 last_date = SearchBluesky.bsky_convert_datetime_string(post.get("indexed_at")) 334 if post_id in query_post_ids: 335 # Skip duplicate posts 336 continue 337 338 new_posts += 1 339 query_post_ids.add(post_id) 340 341 # Add user handles from references 342 did_to_handle[post["author"]["did"]] = post["author"]["handle"] 343 # Mentions 344 mentions = [] 345 if post["record"].get("facets"): 346 for facet in post["record"]["facets"]: 347 for feature in facet.get("features", {}): 348 if feature.get("did"): 349 if feature["did"] in did_to_handle: 350 mentions.append({"did": feature["did"], "handle": did_to_handle[feature["did"]]}) 351 else: 352 handle = SearchBluesky.bsky_get_handle_from_did(client, feature["did"]) 353 if handle: 354 if handle.lower() in self.handle_lookup_error_messages: 355 self.dataset.log(f"Bluesky: user ({feature['did']}) {handle}") 356 mentions.append({"did": feature["did"], "handle": handle}) 357 did_to_handle[feature["did"]] = handle 358 else: 359 mentions.append({"did": feature["did"], "handle": None}) 360 self.dataset.log(f"Bluesky: could not lookup the handle for {feature['did']}") 361 # Reply to 362 reply_to_handle = None 363 if post["record"].get("reply"): 364 reply_to_did = post["record"]["reply"]["parent"]["uri"].split("/")[2] 365 if reply_to_did in did_to_handle: 366 reply_to_handle = did_to_handle[reply_to_did] 367 else: 368 handle = SearchBluesky.bsky_get_handle_from_did(client, reply_to_did) 369 if handle: 370 if handle.lower() in self.handle_lookup_error_messages: 371 self.dataset.log(f"Bluesky: user ({reply_to_did}) {handle}") 372 reply_to_handle = handle 373 did_to_handle[reply_to_did] = handle 374 else: 375 self.dataset.log(f"Bluesky: could not find handle for {reply_to_did}") 376 377 378 post.update({"4CAT_metadata": { 379 "collected_at": datetime.now().timestamp(), 380 "query": query, 381 "rank": rank, 382 "mentions": mentions, 383 "reply_to": reply_to_handle if reply_to_handle else None, 384 }}) 385 rank += 1 386 yield post 387 total_posts += 1 388 389 # Check if there is a cursor for the next page 390 cursor = response['cursor'] 391 if max_posts != 0: 392 self.dataset.update_progress(total_posts / (max_posts * num_queries)) 393 394 if 0 < max_posts <= rank: 395 self.dataset.update_status( 396 f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}") 397 break 398 399 if not cursor: 400 if new_posts: 401 # Bluesky API seems to stop around 10000 posts and not return a cursor 402 # Re-query with the same query to get the next set of posts using last_date (set above) 403 self.dataset.log(f"Query {query}: {query_requests} requests") 404 queries.insert(0, query) 405 else: 406 # No new posts; if we have not hit the max_posts, but no new posts are being returned, then we are done 407 self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned") 408 409 if rank: 410 self.dataset.update_status(f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}") 411 break # No more pages, stop the loop 412 elif not items: 413 self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned") 414 break
Execute a query; get messages for given parameters
Basically a wrapper around execute_queries() to call it with asyncio.
Parameters
- dict query: Query parameters, as part of the DataSet object
Returns
Posts, sorted by thread and post ID, in ascending order
416 @staticmethod 417 def map_item(item): 418 """ 419 Convert item object to 4CAT-ready data object 420 421 :param dict item: item to parse 422 :return dict: 4CAT-compatible item object 423 """ 424 unmapped_data = [] 425 426 # Add link to post; this is a Bluesky-specific URL and may not always be accurate 427 link = SearchBluesky.get_bsky_link(item['author']['handle'], item['uri'].split('/')[-1]) 428 author_profile = f"https://bsky.app/profile/{item['author']['handle']}" 429 430 created_at = SearchBluesky.bsky_convert_datetime_string(item["record"].get("created_at",item["record"].get("createdAt"))) 431 432 # Tags 433 tags = set() 434 links = set() 435 mentions_did = set() 436 has_poll = False 437 if item["record"].get("facets"): 438 for facet in item["record"].get("facets"): 439 for feature in facet.get("features"): 440 if feature.get("tag"): 441 tags.add(feature.get("tag")) 442 elif feature.get("uri"): 443 links.add(feature.get("uri")) 444 elif feature.get("did"): 445 mentions_did.add(feature.get("did")) 446 elif feature.get("number"): 447 has_poll = True 448 else: 449 unmapped_data.append({"loc": "record.facets.features", "obj": feature}) 450 if "features" not in facet: 451 unmapped_data.append({"loc": "record.facets", "obj": facet}) 452 453 # Embeds are in both the item and the record; so far these always contain same item 454 embeded_links = set() 455 embeded_images = set() 456 image_references = set() 457 quoted_link = None 458 quoted_user = None 459 quoted_ref = None 460 possible_embeds = [item.get("embed", {}), item["record"].get("embed", {})] 461 while possible_embeds: 462 embed = possible_embeds.pop(0) 463 if not embed: 464 continue 465 466 py_type = embed.pop("py_type") if "py_type" in embed else (embed.pop("$type") if "$type" in embed else None) 467 if py_type in ["app.bsky.embed.recordWithMedia#view", "app.bsky.embed.recordWithMedia"]: 468 # contains post plus additional media 469 for key, media_ob in embed.items(): 470 possible_embeds.append(media_ob) 471 472 elif "images" in embed: # py_type in ["app.bsky.embed.images#view", "app.bsky.embed.images", "app.bsky.embed.images#main"] 473 for img_ob in embed["images"]: 474 img_link = img_ob.get("fullsize", img_ob.get("thumb")) 475 if img_link: 476 embeded_images.add(img_link) 477 elif img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")): 478 # ob.get("image").get("ref").get("link") will have a reference that could be linked via API 479 # BUT ref has already been obtained in other embeds... 480 image_references.add(img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link"))) 481 else: 482 unmapped_data.append({"loc": "embed.images", "obj": img_ob}) 483 elif py_type in ["app.bsky.embed.video#view", "app.bsky.embed.video"]: 484 # Does not appear to be direct video links, just thumbnail (Bluesky API may be able to get more) 485 if embed.get("thumbnail"): 486 embeded_images.add(embed.get("thumbnail")) 487 elif embed.get("video", {}).get("ref", {}).get("link"): 488 image_references.add(embed.get("video", {}).get("ref", {}).get("link")) 489 else: 490 # No thumb for video 491 pass 492 elif "record" in embed: # py_type in ["app.bsky.embed.record#view", "app.bsky.embed.record"] 493 # Quoted post 494 # Note: these may also contain images that would be seen, but are not part of the original post 495 if embed["record"].get("author", embed["record"].get("creator")): 496 if "handle" not in embed["record"].get("author", embed["record"].get("creator")): 497 # User may not be able to see original post 498 if "app.bsky.feed.defs#blockedAuthor" == embed["record"].get("author", embed["record"].get("creator"))["py_type"]: 499 quoted_link = "VIEWER BLOCKED BY AUTHOR" 500 else: 501 # New unknown 502 unmapped_data.append({"loc": "embed.record.author", "obj": embed["record"].get("author", embed["record"].get("creator"))}) 503 else: 504 # author seems to be a quoted post while creator a quoted "list" 505 quoted_user = embed["record"].get("author", embed["record"].get("creator"))["handle"] 506 quoted_link = SearchBluesky.get_bsky_link(quoted_user, embed['record']['uri'].split('/')[-1]) 507 elif embed["record"].get("not_found"): 508 quoted_link = "DELETED" 509 # We do have the DID, but this information is not normally displayed 510 # quoted_user = embed["record"]['uri'].split("/")[2] 511 elif embed["record"].get("detached"): 512 quoted_link = "REMOVED BY AUTHOR" 513 else: 514 quoted_ref = embed["record"]['uri'] 515 elif "external" in embed: # py_type in ["app.bsky.embed.external#view", "app.bsky.embed.external"] 516 if embed["external"].get("uri"): 517 embeded_links.add(embed["external"].get("uri")) 518 if embed["external"].get("thumb"): 519 if isinstance(embed["external"]["thumb"], str): 520 embeded_images.add(embed["external"]["thumb"]) 521 else: 522 image_references.add(embed["external"]["thumb"].get("ref", {}).get("link", "")) 523 else: 524 # No thumb for link 525 pass 526 else: 527 unmapped_data.append({"loc": f"embed.{py_type}", 528 "obj": embed}) 529 530 # Replies allowed 531 # https://docs.bsky.app/docs/tutorials/thread-gates 532 # threadgate object does not appear to differentiate between the types of replies allowed 533 replies_allowed = True if not item["threadgate"] else False 534 535 # Labels (content moderation) 536 labels = set() if not item["labels"] else set([label.get("val") for label in item["labels"]]) 537 if item["record"].get("labels"): 538 labels = labels | set([label.get("val") for label in item["record"]["labels"].get("values",[])]) 539 540 # Language 541 languages = "N/A" if not item["record"].get("langs") else ",".join(item["record"].get("langs")) 542 543 # Missing references 544 if any([ref for ref in image_references if ref not in "".join(embeded_images)]): 545 unmapped_data.append({"loc": "missing_image_refs", "obj": [ref for ref in image_references if ref not in "".join(embeded_images)]}) 546 if quoted_ref: 547 if not quoted_link or (quoted_link not in ["DELETED", "REMOVED BY AUTHOR", "VIEWER BLOCKED BY AUTHOR"] and quoted_ref.split('/')[-1] not in quoted_link): 548 unmapped_data.append({"loc": "missing_quote_ref", "obj": quoted_ref}) 549 550 # Reference Posts (expanded to include handles during collection) 551 # None: handles may change; true DID from original object stored item["record"]["facets"]["features"] w/ "did" 552 # Mentions 553 mentions = [(mention.get("handle") if (mention.get("handle") and mention["handle"].lower() not in SearchBluesky.handle_lookup_error_messages) else mention.get("did")) for mention in item.get("4CAT_metadata", {}).get("mentions", [])] 554 # Reply to 555 replied_to_post = None 556 replied_to_user = None 557 if item["record"].get("reply"): 558 if item["4CAT_metadata"]["reply_to"] and item["4CAT_metadata"]["reply_to"].lower() not in SearchBluesky.handle_lookup_error_messages: 559 replied_to_user = item["4CAT_metadata"]["reply_to"] 560 else: 561 # Use DID, though this will not create a working link 562 replied_to_user = item["record"]["reply"]["parent"]["uri"].split("/")[2] 563 replied_to_post = SearchBluesky.get_bsky_link(replied_to_user, item["record"]["reply"]["parent"]["uri"].split("/")[-1]) 564 565 # These refer to slices of the text, but are also contained in the text or as an embed. If they are NOT also in the text and/or embed fields, then they are NOT displayed in bsky.app UI and thus only metadata 566 # if item["record"].get("entities"): 567 # unmapped_data.append({"loc": "record.entities", "obj": item["record"]["entities"]}) 568 569 # Author tags, not hashtags, not seen, very rarely used 570 # if item["record"].get("tags"): 571 # unmapped_data.append({"loc": "record.tags", "obj": item["record"].get("tags")}) 572 573 if unmapped_data: 574 # TODO: Use MappedItem message; currently it is not called... 575 config.with_db() 576 config.db.log.warning(f"Bluesky new mappings ({item['uri']}): {unmapped_data}") 577 578 return MappedItem({ 579 "collected_at": datetime.fromtimestamp(item["4CAT_metadata"]["collected_at"]).isoformat(), 580 "query": item["4CAT_metadata"]["query"], 581 "rank": item["4CAT_metadata"]["rank"], 582 "id": item["uri"], 583 "thread_id": item["record"]["reply"]["root"]["uri"] if item["record"].get("reply") else item["uri"], 584 "created_at": created_at.isoformat(), 585 "author": item["author"]["handle"], 586 "author_id": item["author"]["did"], 587 "body": item["record"]["text"], 588 "link": link, 589 "tags": ",".join(tags), 590 "like_count": item["like_count"], 591 "quote_count": item["quote_count"], 592 "reply_count": item["reply_count"], 593 "repost_count": item["repost_count"], 594 "quoted_post": quoted_link if quoted_link else "", 595 "quoted_user": quoted_user if quoted_user else "", 596 "replied_to_post": replied_to_post if replied_to_post else "", 597 "replied_to_user": replied_to_user if replied_to_user else "", 598 "replies_allowed": replies_allowed, 599 "mentions": ",".join(mentions), 600 "links": ",".join(embeded_links | links), 601 "images": ",".join(embeded_images), 602 "labels": ",".join(labels), 603 "has_poll": has_poll, 604 "languages": languages, 605 606 "author_display_name": item["author"]["display_name"], 607 "author_profile": author_profile, 608 "author_avatar": item["author"]["avatar"], 609 "author_created_at": SearchBluesky.bsky_convert_datetime_string(item["author"]["created_at"], mode="iso_string", raise_error=False), 610 611 "timestamp": created_at.timestamp(), 612 }, message=f"Bluesky new mappings: {unmapped_data}")
Convert item object to 4CAT-ready data object
Parameters
- dict item: item to parse
Returns
4CAT-compatible item object
614 @staticmethod 615 def bsky_convert_datetime_string(datetime_string, mode="datetime", raise_error=True): 616 """ 617 Bluesky datetime string to datetime object. 618 619 Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string. 620 621 :param str datetime_string: The datetime string to convert 622 :param str mode: The mode to return the datetime object in [datetime, iso_string] 623 :param bool raise_error: Raise error if unable to parse else return datetime_string 624 :return datetime/str: The converted datetime object 625 """ 626 try: 627 datetime_object = parser.isoparse(datetime_string) 628 except ValueError as e: 629 if raise_error: 630 raise e 631 return datetime_string 632 633 if mode == "datetime": 634 return datetime_object 635 elif mode == "iso_string": 636 return datetime_object.isoformat()
Bluesky datetime string to datetime object.
Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string.
Parameters
- str datetime_string: The datetime string to convert
- str mode: The mode to return the datetime object in [datetime, iso_string]
- bool raise_error: Raise error if unable to parse else return datetime_string
Returns
The converted datetime object
639 @staticmethod 640 def get_bsky_link(handle, post_id): 641 """ 642 Get link to Bluesky post 643 """ 644 return f"https://bsky.app/profile/{handle}/post/{post_id}"
Get link to Bluesky post
646 @staticmethod 647 def bsky_get_handle_from_did(client, did): 648 """ 649 Get handle from DID 650 """ 651 tries = 0 652 while True: 653 try: 654 user_profile = client.app.bsky.actor.get_profile({"actor": did}) 655 if user_profile: 656 return user_profile.handle 657 else: 658 return None 659 except InvokeTimeoutError as e: 660 # Network error; try again 661 tries += 1 662 time.sleep(1) 663 if tries > 3: 664 return None 665 continue 666 except BadRequestError as e: 667 if e.response.content.message: 668 return e.response.content.message 669 return None
Get handle from DID
671 @staticmethod 672 def bsky_login(username, password, session_id): 673 """ 674 Login to Bluesky 675 676 :param str username: Username for Bluesky 677 :param str password: Password for Bluesky 678 :param str session_id: Session ID to use for login 679 :return Client: Client object with login credentials 680 """ 681 if not session_id: 682 session_id = SearchBluesky.create_session_id(username, password) 683 elif (not username or not password) and not session_id: 684 raise ValueError("Must provide both username and password or else session_id.") 685 686 session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), "bsky_" + session_id + ".session") 687 688 def on_session_change(event: SessionEvent, session: Session) -> None: 689 """ 690 Save session to file; atproto session change event handler should keep the session up to date 691 692 https://atproto.blue/en/latest/atproto_client/auth.html 693 """ 694 print('Session changed:', event, repr(session)) 695 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 696 print('Saving changed session') 697 with session_path.open("w") as session_file: 698 session_file.write(session.export()) 699 700 client = Client() 701 client.on_session_change(on_session_change) 702 if session_path.exists(): 703 with session_path.open() as session_file: 704 session_string = session_file.read() 705 client.login(session_string=session_string) 706 else: 707 # Were not able to log in via session string; login with username and password 708 client.login(login=username, password=password) 709 return client
Login to Bluesky
Parameters
- str username: Username for Bluesky
- str password: Password for Bluesky
- str session_id: Session ID to use for login
Returns
Client object with login credentials
711 @staticmethod 712 def create_session_id(username, password): 713 """ 714 Generate a filename for the session file 715 716 This is a combination of username and password, but hashed 717 so that one cannot actually derive someone's information. 718 719 :param str username: Username for Bluesky 720 :param str password: Password for Bluesky 721 :return str: A hash value derived from the input 722 """ 723 hash_base = username.strip() + str(password).strip() 724 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
Generate a filename for the session file
This is a combination of username and password, but hashed so that one cannot actually derive someone's information.
Parameters
- str username: Username for Bluesky
- str password: Password for Bluesky
Returns
A hash value derived from the input
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.search.Search
- max_workers
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- search
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- add_field_to_parent
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- map_item_method_available
- get_mapped_item
- is_filter
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor