datasources.bsky.search_bsky
Collect Bluesky posts
1""" 2Collect Bluesky posts 3""" 4import hashlib 5import time 6from datetime import datetime 7 8from dateutil import parser 9 10from atproto import Client, Session, SessionEvent 11from atproto_client.exceptions import UnauthorizedError, BadRequestError, InvokeTimeoutError, RequestException, \ 12 ModelError, NetworkError 13 14from backend.lib.search import Search 15from common.lib.exceptions import QueryParametersException, QueryNeedsExplicitConfirmationException, \ 16 ProcessorInterruptedException 17from common.lib.helpers import timify 18from common.lib.user_input import UserInput 19from common.lib.item_mapping import MappedItem 20 21class SearchBluesky(Search): 22 """ 23 Search for posts in Bluesky 24 """ 25 type = "bsky-search" # job ID 26 category = "Search" # category 27 title = "Bluesky Search" # title displayed in UI 28 description = "Collects Bluesky posts via its API." # description displayed in UI 29 extension = "ndjson" # extension of result file, used internally and in UI 30 is_local = False # Whether this datasource is locally scraped 31 is_static = False # Whether this datasource is still updated 32 33 config = { 34 "bsky-search.max_results": { 35 "type": UserInput.OPTION_TEXT, 36 "help": "Maximum results per query", 37 "coerce_type": int, 38 "min": 0, 39 "default": 50000, 40 "tooltip": "Amount of results (e.g., posts) per query. '0' will allow unlimited." 41 } 42 } 43 44 handle_lookup_error_messages = ['account is deactivated', "profile not found", "account has been suspended"] 45 46 @classmethod 47 def get_options(cls, parent_dataset=None, config=None): 48 """ 49 Get processor options 50 51 Just updates the description of the entities field based on the 52 configured max entities. 53 54 :param DataSet parent_dataset: An object representing the dataset that 55 the processor would be run on 56 :param User user: Flask user the options will be displayed for, in 57 case they are requested for display in the 4CAT web interface. This can 58 be used to show some options only to privileges users. 59 """ 60 options = { 61 "intro": { 62 "type": UserInput.OPTION_INFO, 63 "help": "Collects Bluesky posts via its API.\n\nYour login credentials will be sent to the 4CAT server " 64 "and stored there while data is fetched. After the dataset has been created your credentials " 65 "will be deleted from the server. \n[See tips and tricks on how to query Bluesky](https://bsky.social/about/blog/05-31-2024-search)." 66 }, 67 "username": { 68 "type": UserInput.OPTION_TEXT, 69 "help": "Bluesky Username", 70 "cache": True, 71 "sensitive": True, 72 "tooltip": "If no server is specified, .bsky.social is used." 73 }, 74 "password": { 75 "type": UserInput.OPTION_TEXT, 76 "help": "Bluesky Password", 77 "cache": True, # tells the frontend to cache this value 78 "sensitive": True, # tells the backend to delete this value after use 79 "password": True, # tells the frontend this is a password type 80 }, 81 "divider": { 82 "type": UserInput.OPTION_DIVIDER 83 }, 84 "query": { 85 "type": UserInput.OPTION_TEXT_LARGE, 86 "help": "Search Queries", 87 "tooltip": "Separate with commas or line breaks." 88 }, 89 "max_posts": { 90 "type": UserInput.OPTION_TEXT, 91 "help": "Max posts per query", 92 "min": 1, 93 "default": 100 94 }, 95 "daterange": { 96 "type": UserInput.OPTION_DATERANGE, 97 "help": "Date range", 98 "tooltip": "The date range for the search. No date range will search all posts." 99 }, 100 } 101 102 # Update the max_posts setting from config 103 max_posts = int(config.get('bsky-search.max_results', default=100)) 104 if max_posts == 0: 105 # This is potentially madness 106 options["max_posts"]["tooltip"] = "Set to 0 to collect all posts." 107 options['max_posts']['min'] = 0 108 else: 109 options["max_posts"]["max"] = max_posts 110 options['max_posts']['default'] = options['max_posts']['default'] if options['max_posts']['default'] <= max_posts else max_posts 111 112 return options 113 114 @staticmethod 115 def validate_query(query, request, config): 116 """ 117 Validate Bluesky query 118 119 :param dict query: Query parameters, from client-side. 120 :param request: Flask request 121 :param User user: User object of user who has submitted the query 122 :return dict: Safe query parameters 123 """ 124 # no query 4 u 125 if not query.get("query", "").strip(): 126 raise QueryParametersException("You must provide a search query.") 127 128 if not query.get("username", None) or not query.get("password", None) : 129 raise QueryParametersException("You need to provide valid Bluesky login credentials first.") 130 131 # If no server is specified, default to .bsky.social 132 if "." not in query.get("username"): 133 query["username"] += ".bsky.social" 134 # Remove @ at the start 135 if query.get("username").startswith("@"): 136 query["username"] = query["username"][1:] 137 138 # Test login credentials 139 session_id = SearchBluesky.create_session_id(query["username"], query["password"]) 140 try: 141 SearchBluesky.bsky_login(username=query["username"], password=query["password"], session_id=session_id, session_directory=config.get("PATH_ROOT").joinpath(config.get("PATH_SESSIONS"))) 142 except UnauthorizedError: 143 raise QueryParametersException("Invalid Bluesky login credentials.") 144 except RequestException as e: 145 if e.response.content.message == 'Rate Limit Exceeded': 146 lifted_at = datetime.fromtimestamp(int(e.response.headers["ratelimit-reset"])) 147 raise QueryParametersException(f"Bluesky rate limit exceeded. Try again after {lifted_at.strftime('%Y-%m-%d %H:%M:%S')}.") 148 else: 149 raise QueryParametersException(f"Bluesky login failed. {e.response.content.message}") 150 151 # sanitize query 152 sanitized_query = [q.strip() for q in query.get("query").replace("\n", ",").split(",") if q.strip()] 153 154 # the dates need to make sense as a range to search within 155 min_date, max_date = query.get("daterange") 156 if min_date and max_date and min_date > max_date: 157 raise QueryParametersException("The start date must be before the end date.") 158 159 # Only check this if not already confirmed by the frontend 160 posts_per_second = 55 # gathered from simply checking start/end times of logs 161 if not query.get("frontend-confirm"): 162 # Estimate is not returned; use max_posts as a rough estimate 163 max_posts = query.get("max_posts", 100) 164 expected_tweets = query.get("max_posts", 100) * len(sanitized_query) 165 # Warn if process may take more than ~1 hours 166 if expected_tweets > (posts_per_second * 3600): 167 expected_time = timify(expected_tweets / posts_per_second) 168 raise QueryNeedsExplicitConfirmationException(f"This query matches approximately {expected_tweets} tweets and may take {expected_time} to complete. Do you want to continue?") 169 elif max_posts == 0 and not min_date: 170 raise QueryNeedsExplicitConfirmationException("No maximum number of posts set! This query has no minimum date and thus may take a very, very long time to complete. Do you want to continue?") 171 elif max_posts == 0: 172 raise QueryNeedsExplicitConfirmationException("No maximum number of posts set! This query may take a long time to complete. Do you want to continue?") 173 174 return { 175 "max_posts": query.get("max_posts"), 176 "query": ",".join(sanitized_query), 177 "username": query.get("username"), 178 "password": query.get("password"), 179 "session_id": session_id, 180 "min_date": min_date, 181 "max_date": max_date, 182 } 183 184 def get_items(self, query): 185 """ 186 Execute a query; get messages for given parameters 187 188 Basically a wrapper around execute_queries() to call it with asyncio. 189 190 :param dict query: Query parameters, as part of the DataSet object 191 :return list: Posts, sorted by thread and post ID, in ascending order 192 """ 193 if not query.get("session_id") and (not query.get("username") or not query.get("password")): 194 return self.dataset.finish_with_error("Your Bluesky login credentials are no longer available in 4CAT; please re-create this datasource.") 195 196 session_id = SearchBluesky.create_session_id(query.get("username"), query.get("password")) if not query.get("session_id") else query["session_id"] 197 try: 198 client = SearchBluesky.bsky_login(username=query.get("username"), password=query.get("password"), session_id=session_id, session_directory=self.config.get("PATH_ROOT").joinpath(self.config.get("PATH_SESSIONS"))) 199 except (UnauthorizedError, RequestException, BadRequestError) as e: 200 self.dataset.log(f"Bluesky login failed: {e}") 201 return self.dataset.finish_with_error("Bluesky login failed; please re-create this datasource.") 202 203 self.dataset.update_status(f"Collecting posts from Bluesky as {client.me.handle}") 204 205 max_posts = query.get("max_posts", 100) 206 limit = 100 if (max_posts > 100 or max_posts == 0) else max_posts 207 208 # Handle reference mapping; user references use did instead of dynamic handle 209 did_to_handle = {} 210 211 query_parameters = { 212 "limit": limit, 213 } 214 215 # Add start and end dates if provided 216 if self.parameters.get("min_date"): 217 query_parameters['since'] = datetime.fromtimestamp(self.parameters.get("min_date")).strftime('%Y-%m-%dT%H:%M:%SZ') 218 if self.parameters.get("max_date"): 219 query_parameters['until'] = datetime.fromtimestamp(self.parameters.get("max_date")).strftime('%Y-%m-%dT%H:%M:%SZ') 220 221 queries = query.get("query").split(",") 222 num_queries = len(queries) 223 total_posts = 0 224 i = 0 225 last_query = None 226 last_date = None 227 while queries: 228 query = queries.pop(0) 229 if query == last_query: 230 # Check if there are continued posts from the last query 231 query_parameters['until'] = last_date.strftime('%Y-%m-%dT%H:%M:%SZ') 232 self.dataset.log(f"Continuing query ({i} of {num_queries}): {query} from {last_date.strftime('%Y-%m-%dT%H:%M:%SZ')}") 233 else: 234 # New query 235 query_post_ids = set() 236 i += 1 237 rank = 0 238 last_query = query 239 last_date = None 240 self.dataset.update_status(f"Collecting query ({i} of {num_queries}): {query}") 241 query_requests = 0 242 243 query_parameters["q"] = query 244 cursor = None # Start with no cursor (first page) 245 search_for_invalid_post = False 246 invalid_post_counter = 0 247 while True: 248 if self.interrupted: 249 raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API") 250 # Query posts, including pagination (cursor for next page) 251 tries = 0 252 response = None 253 while tries < 3: 254 query_parameters["cursor"] = cursor 255 try: 256 response = client.app.bsky.feed.search_posts(params=query_parameters) 257 break 258 except ModelError as e: 259 # Post validation error; one post is unable to be read 260 # Pattern: some invalid post raises error, we switch from higher limit (usually 100) to 1 in 261 # order to collect post by post, invalid post is identified again, we switch back to higher 262 # limit and continue as normal, at the "end" of a cursor/query life (~10k posts) a NetworkError 263 # is raised with detail refering to a server error 502 InternalServerError, we catch that and 264 # add the query back to the queue with a new "until" date to continue the query 265 # https://github.com/bluesky-social/atproto/issues/3446 266 if not search_for_invalid_post: 267 # New invalid post, search and skip 268 self.dataset.log(f"Invalid post detected; searching post by post: {e}") 269 search_for_invalid_post = True 270 # Currently we must search post by post to find the invalid post 271 query_parameters["limit"] = 1 272 else: 273 # Found invalid post, skip, reset counters 274 self.dataset.log( 275 f"Invalid post identified; skipping and continue with query as normal: {e}") 276 search_for_invalid_post = False 277 # Reset limit to normal 278 query_parameters["limit"] = limit 279 invalid_post_counter = 0 280 cursor = str(int(cursor) + 1) if cursor else None 281 # Re-query with new cursor & limit 282 continue 283 284 except InvokeTimeoutError as e: 285 # Timeout error, but can occur for odd queries with no results 286 self.dataset.log(f"Bluesky request error for query {query}: {e}") 287 time.sleep(1) 288 tries += 2 289 continue 290 except NetworkError as e: 291 # 502 InternalServerError: occurs if switch limits in a "set" (i.e. the vague 10k posts cursor limit), I seem to get this error around the 10k mark instead of just a missing cursor as normal 292 self.dataset.log(f"Bluesky network error for query {query}; retrying: {e}") 293 time.sleep(1 + (tries * 10)) 294 queries.insert(0, query) 295 break 296 297 if not response: 298 # Expected from NetworkError, but the query will have been added back to the queue 299 # If not, then there was a problem with the query 300 if len(queries) == 0: 301 self.dataset.update_status("Error collecting posts from Bluesky; see log for details", is_final=True) 302 if query not in queries: 303 # Query was not added back; there was an unexpected issue with the query itself 304 self.dataset.update_status(f"Error continuing {query} from Bluesky (see log for details); continuing to next query") 305 break 306 307 query_requests += 1 308 items = response['posts'] if hasattr(response, 'posts') else [] 309 310 if search_for_invalid_post: 311 invalid_post_counter += 1 312 if invalid_post_counter >= 100: 313 # Max limit is 100; this should not occur, but we do not want to continue searching post by post indefinitely 314 self.dataset.log("Unable to identify invalid post; discontinuing search") 315 query_parameters["limit"] = limit 316 search_for_invalid_post = False 317 invalid_post_counter = 0 318 319 if not items: 320 # Sometimes no post is returned, but there still may be posts following 321 self.dataset.log(f"Query {query} w/ params {query_parameters} returned no posts: {response}") 322 # TODO: this is odd; no information is returned as to why that one item is not returned and no error is raised 323 cursor = str(int(cursor) + 1) if cursor else None 324 continue 325 326 new_posts = 0 327 # Handle the posts 328 for item in items: 329 if 0 < max_posts <= rank: 330 break 331 332 if self.interrupted: 333 raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API") 334 335 post = item.model_dump() 336 post_id = post["uri"] 337 # Queries use the indexed_at date for time-based pagination (as opposed to created_at); used to continue query if needed 338 last_date = SearchBluesky.bsky_convert_datetime_string(post.get("indexed_at")) 339 if post_id in query_post_ids: 340 # Skip duplicate posts 341 continue 342 343 new_posts += 1 344 query_post_ids.add(post_id) 345 346 # Add user handles from references 347 did_to_handle[post["author"]["did"]] = post["author"]["handle"] 348 # Mentions 349 mentions = [] 350 if post["record"].get("facets"): 351 for facet in post["record"]["facets"]: 352 for feature in facet.get("features", {}): 353 if feature.get("did"): 354 if feature["did"] in did_to_handle: 355 mentions.append({"did": feature["did"], "handle": did_to_handle[feature["did"]]}) 356 else: 357 handle = SearchBluesky.bsky_get_handle_from_did(client, feature["did"]) 358 if handle: 359 if handle.lower() in self.handle_lookup_error_messages: 360 self.dataset.log(f"Bluesky: user ({feature['did']}) {handle}") 361 mentions.append({"did": feature["did"], "handle": handle}) 362 did_to_handle[feature["did"]] = handle 363 else: 364 mentions.append({"did": feature["did"], "handle": None}) 365 self.dataset.log(f"Bluesky: could not lookup the handle for {feature['did']}") 366 # Reply to 367 reply_to_handle = None 368 if post["record"].get("reply"): 369 reply_to_did = post["record"]["reply"]["parent"]["uri"].split("/")[2] 370 if reply_to_did in did_to_handle: 371 reply_to_handle = did_to_handle[reply_to_did] 372 else: 373 handle = SearchBluesky.bsky_get_handle_from_did(client, reply_to_did) 374 if handle: 375 if handle.lower() in self.handle_lookup_error_messages: 376 self.dataset.log(f"Bluesky: user ({reply_to_did}) {handle}") 377 reply_to_handle = handle 378 did_to_handle[reply_to_did] = handle 379 else: 380 self.dataset.log(f"Bluesky: could not find handle for {reply_to_did}") 381 382 383 post.update({"4CAT_metadata": { 384 "collected_at": datetime.now().timestamp(), 385 "query": query, 386 "rank": rank, 387 "mentions": mentions, 388 "reply_to": reply_to_handle if reply_to_handle else None, 389 }}) 390 rank += 1 391 yield post 392 total_posts += 1 393 394 # Check if there is a cursor for the next page 395 cursor = response['cursor'] 396 if max_posts != 0 and rank % (max_posts // 10) == 0: 397 self.dataset.update_status(f"Progress query {query}: {rank} posts collected out of {max_posts}") 398 self.dataset.update_progress(total_posts / (max_posts * num_queries)) 399 elif max_posts == 0 and rank % 1000 == 0: 400 self.dataset.update_status(f"Progress query {query}: {rank} posts collected") 401 402 if 0 < max_posts <= rank: 403 self.dataset.update_status( 404 f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}") 405 break 406 407 if not cursor: 408 if new_posts: 409 # Bluesky API seems to stop around 10000 posts and not return a cursor 410 # Re-query with the same query to get the next set of posts using last_date (set above) 411 self.dataset.log(f"Query {query}: {query_requests} requests") 412 queries.insert(0, query) 413 else: 414 # No new posts; if we have not hit the max_posts, but no new posts are being returned, then we are done 415 self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned") 416 417 if rank: 418 self.dataset.update_status(f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}") 419 break # No more pages, stop the loop 420 elif not items: 421 self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned") 422 break 423 424 @staticmethod 425 def map_item(item): 426 """ 427 Convert item object to 4CAT-ready data object 428 429 :param dict item: item to parse 430 :return dict: 4CAT-compatible item object 431 """ 432 unmapped_data = [] 433 434 # Add link to post; this is a Bluesky-specific URL and may not always be accurate 435 link = SearchBluesky.get_bsky_link(item['author']['handle'], item['uri'].split('/')[-1]) 436 author_profile = f"https://bsky.app/profile/{item['author']['handle']}" 437 438 created_at = SearchBluesky.bsky_convert_datetime_string(item["record"].get("created_at",item["record"].get("createdAt"))) 439 440 # Tags 441 tags = set() 442 links = set() 443 mentions_did = set() 444 has_poll = False 445 if item["record"].get("facets"): 446 for facet in item["record"].get("facets"): 447 for feature in facet.get("features"): 448 if feature.get("tag"): 449 tags.add(feature.get("tag")) 450 elif feature.get("uri"): 451 links.add(feature.get("uri")) 452 elif feature.get("did"): 453 mentions_did.add(feature.get("did")) 454 elif feature.get("number"): 455 has_poll = True 456 else: 457 unmapped_data.append({"loc": "record.facets.features", "obj": feature}) 458 if "features" not in facet: 459 unmapped_data.append({"loc": "record.facets", "obj": facet}) 460 461 # Embeds are in both the item and the record; so far these always contain same item 462 embeded_links = set() 463 embeded_images = set() 464 image_references = set() 465 quoted_link = None 466 quoted_user = None 467 quoted_ref = None 468 possible_embeds = [item.get("embed", {}), item["record"].get("embed", {})] 469 while possible_embeds: 470 embed = possible_embeds.pop(0) 471 if not embed: 472 continue 473 474 py_type = embed.pop("py_type") if "py_type" in embed else (embed.pop("$type") if "$type" in embed else None) 475 if py_type in ["app.bsky.embed.recordWithMedia#view", "app.bsky.embed.recordWithMedia"]: 476 # contains post plus additional media 477 for key, media_ob in embed.items(): 478 possible_embeds.append(media_ob) 479 480 elif "images" in embed: # py_type in ["app.bsky.embed.images#view", "app.bsky.embed.images", "app.bsky.embed.images#main"] 481 for img_ob in embed["images"]: 482 img_link = img_ob.get("fullsize", img_ob.get("thumb")) 483 if img_link: 484 embeded_images.add(img_link) 485 elif img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")): 486 # ob.get("image").get("ref").get("link") will have a reference that could be linked via API 487 # BUT ref has already been obtained in other embeds... 488 image_references.add(img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link"))) 489 else: 490 unmapped_data.append({"loc": "embed.images", "obj": img_ob}) 491 elif py_type in ["app.bsky.embed.video#view", "app.bsky.embed.video"]: 492 # Does not appear to be direct video links, just thumbnail (Bluesky API may be able to get more) 493 if embed.get("thumbnail"): 494 embeded_images.add(embed.get("thumbnail")) 495 elif embed.get("video", {}).get("ref", {}).get("link"): 496 image_references.add(embed.get("video", {}).get("ref", {}).get("link")) 497 else: 498 # No thumb for video 499 pass 500 elif "record" in embed: # py_type in ["app.bsky.embed.record#view", "app.bsky.embed.record"] 501 # Quoted post 502 # Note: these may also contain images that would be seen, but are not part of the original post 503 if embed["record"].get("author", embed["record"].get("creator")): 504 if "handle" not in embed["record"].get("author", embed["record"].get("creator")): 505 # User may not be able to see original post 506 if "app.bsky.feed.defs#blockedAuthor" == embed["record"].get("author", embed["record"].get("creator"))["py_type"]: 507 quoted_link = "VIEWER BLOCKED BY AUTHOR" 508 else: 509 # New unknown 510 unmapped_data.append({"loc": "embed.record.author", "obj": embed["record"].get("author", embed["record"].get("creator"))}) 511 else: 512 # author seems to be a quoted post while creator a quoted "list" 513 quoted_user = embed["record"].get("author", embed["record"].get("creator"))["handle"] 514 quoted_link = SearchBluesky.get_bsky_link(quoted_user, embed['record']['uri'].split('/')[-1]) 515 elif embed["record"].get("not_found"): 516 quoted_link = "DELETED" 517 # We do have the DID, but this information is not normally displayed 518 # quoted_user = embed["record"]['uri'].split("/")[2] 519 elif embed["record"].get("detached"): 520 quoted_link = "REMOVED BY AUTHOR" 521 else: 522 quoted_ref = embed["record"]['uri'] 523 elif "external" in embed: # py_type in ["app.bsky.embed.external#view", "app.bsky.embed.external"] 524 if embed["external"].get("uri"): 525 embeded_links.add(embed["external"].get("uri")) 526 if embed["external"].get("thumb"): 527 if isinstance(embed["external"]["thumb"], str): 528 embeded_images.add(embed["external"]["thumb"]) 529 else: 530 image_references.add(embed["external"]["thumb"].get("ref", {}).get("link", "")) 531 else: 532 # No thumb for link 533 pass 534 else: 535 unmapped_data.append({"loc": f"embed.{py_type}", 536 "obj": embed}) 537 538 # Replies allowed 539 # https://docs.bsky.app/docs/tutorials/thread-gates 540 # threadgate object does not appear to differentiate between the types of replies allowed 541 replies_allowed = True if not item["threadgate"] else False 542 543 # Labels (content moderation) 544 labels = set() if not item["labels"] else set([label.get("val") for label in item["labels"]]) 545 if item["record"].get("labels"): 546 labels = labels | set([label.get("val") for label in item["record"]["labels"].get("values",[])]) 547 548 # Language 549 languages = "N/A" if not item["record"].get("langs") else ",".join(item["record"].get("langs")) 550 551 # Missing references 552 if any([ref for ref in image_references if ref not in "".join(embeded_images)]): 553 unmapped_data.append({"loc": "missing_image_refs", "obj": [ref for ref in image_references if ref not in "".join(embeded_images)]}) 554 if quoted_ref: 555 if not quoted_link or (quoted_link not in ["DELETED", "REMOVED BY AUTHOR", "VIEWER BLOCKED BY AUTHOR"] and quoted_ref.split('/')[-1] not in quoted_link): 556 unmapped_data.append({"loc": "missing_quote_ref", "obj": quoted_ref}) 557 558 # Reference Posts (expanded to include handles during collection) 559 # None: handles may change; true DID from original object stored item["record"]["facets"]["features"] w/ "did" 560 # Mentions 561 mentions = [(mention.get("handle") if (mention.get("handle") and mention["handle"].lower() not in SearchBluesky.handle_lookup_error_messages) else mention.get("did")) for mention in item.get("4CAT_metadata", {}).get("mentions", [])] 562 # Reply to 563 replied_to_post = None 564 replied_to_user = None 565 if item["record"].get("reply"): 566 if item["4CAT_metadata"]["reply_to"] and item["4CAT_metadata"]["reply_to"].lower() not in SearchBluesky.handle_lookup_error_messages: 567 replied_to_user = item["4CAT_metadata"]["reply_to"] 568 else: 569 # Use DID, though this will not create a working link 570 replied_to_user = item["record"]["reply"]["parent"]["uri"].split("/")[2] 571 replied_to_post = SearchBluesky.get_bsky_link(replied_to_user, item["record"]["reply"]["parent"]["uri"].split("/")[-1]) 572 573 # These refer to slices of the text, but are also contained in the text or as an embed. If they are NOT also in the text and/or embed fields, then they are NOT displayed in bsky.app UI and thus only metadata 574 # if item["record"].get("entities"): 575 # unmapped_data.append({"loc": "record.entities", "obj": item["record"]["entities"]}) 576 577 # Author tags, not hashtags, not seen, very rarely used 578 # if item["record"].get("tags"): 579 # unmapped_data.append({"loc": "record.tags", "obj": item["record"].get("tags")}) 580 581 return MappedItem({ 582 "collected_at": datetime.fromtimestamp(item["4CAT_metadata"]["collected_at"]).isoformat(), 583 "query": item["4CAT_metadata"]["query"], 584 "rank": item["4CAT_metadata"]["rank"], 585 "id": item["uri"], 586 "thread_id": item["record"]["reply"]["root"]["uri"] if item["record"].get("reply") else item["uri"], 587 "created_at": created_at.isoformat(), 588 "author": item["author"]["handle"], 589 "author_id": item["author"]["did"], 590 "body": item["record"]["text"], 591 "link": link, 592 "tags": ",".join(tags), 593 "like_count": item["like_count"], 594 "quote_count": item["quote_count"], 595 "reply_count": item["reply_count"], 596 "repost_count": item["repost_count"], 597 "quoted_post": quoted_link if quoted_link else "", 598 "quoted_user": quoted_user if quoted_user else "", 599 "replied_to_post": replied_to_post if replied_to_post else "", 600 "replied_to_user": replied_to_user if replied_to_user else "", 601 "replies_allowed": replies_allowed, 602 "mentions": ",".join(mentions), 603 "links": ",".join(embeded_links | links), 604 "images": ",".join(embeded_images), 605 "labels": ",".join(labels), 606 "has_poll": has_poll, 607 "languages": languages, 608 609 "author_display_name": item["author"]["display_name"], 610 "author_profile": author_profile, 611 "author_avatar": item["author"]["avatar"], 612 "author_created_at": SearchBluesky.bsky_convert_datetime_string(item["author"]["created_at"], mode="iso_string", raise_error=False), 613 614 "timestamp": int(created_at.timestamp()), 615 }, message=f"Bluesky new mappings: {unmapped_data}") 616 617 @staticmethod 618 def bsky_convert_datetime_string(datetime_string, mode="datetime", raise_error=True): 619 """ 620 Bluesky datetime string to datetime object. 621 622 Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string. 623 624 :param str datetime_string: The datetime string to convert 625 :param str mode: The mode to return the datetime object in [datetime, iso_string] 626 :param bool raise_error: Raise error if unable to parse else return datetime_string 627 :return datetime/str: The converted datetime object 628 """ 629 try: 630 datetime_object = parser.isoparse(datetime_string) 631 except ValueError as e: 632 if raise_error: 633 raise e 634 return datetime_string 635 636 if mode == "datetime": 637 return datetime_object 638 elif mode == "iso_string": 639 return datetime_object.isoformat() 640 641 642 @staticmethod 643 def get_bsky_link(handle, post_id): 644 """ 645 Get link to Bluesky post 646 """ 647 return f"https://bsky.app/profile/{handle}/post/{post_id}" 648 649 @staticmethod 650 def bsky_get_handle_from_did(client, did): 651 """ 652 Get handle from DID 653 """ 654 tries = 0 655 while True: 656 try: 657 user_profile = client.app.bsky.actor.get_profile({"actor": did}) 658 if user_profile: 659 return user_profile.handle 660 else: 661 return None 662 except (NetworkError, InvokeTimeoutError): 663 # Network error; try again 664 tries += 1 665 time.sleep(1) 666 if tries > 3: 667 return None 668 continue 669 except BadRequestError as e: 670 if e.response.content.message: 671 return e.response.content.message 672 return None 673 674 @staticmethod 675 def bsky_login(username, password, session_id, session_directory): 676 """ 677 Login to Bluesky 678 679 :param str username: Username for Bluesky 680 :param str password: Password for Bluesky 681 :param str session_id: Session ID to use for login 682 :param Path session_directory: Directory to save the session file 683 :return Client: Client object with login credentials 684 """ 685 if not session_id: 686 session_id = SearchBluesky.create_session_id(username, password) 687 elif (not username or not password) and not session_id: 688 raise ValueError("Must provide both username and password or else session_id.") 689 690 session_path = session_directory.joinpath("bsky_" + session_id + ".session") 691 692 def on_session_change(event: SessionEvent, session: Session) -> None: 693 """ 694 Save session to file; atproto session change event handler should keep the session up to date 695 696 https://atproto.blue/en/latest/atproto_client/auth.html 697 """ 698 print('Session changed:', event, repr(session)) 699 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 700 print('Saving changed session') 701 with session_path.open("w") as session_file: 702 session_file.write(session.export()) 703 704 client = Client() 705 client.on_session_change(on_session_change) 706 if session_path.exists(): 707 with session_path.open() as session_file: 708 session_string = session_file.read() 709 try: 710 client.login(session_string=session_string) 711 except BadRequestError as e: 712 if e.response.content.message == 'Token has expired': 713 # Token has expired; try to refresh 714 if username and password: 715 client.login(login=username, password=password) 716 else: 717 raise ValueError("Session token has expired; please re-login with username and password.") 718 else: 719 # Were not able to log in via session string; login with username and password 720 client.login(login=username, password=password) 721 return client 722 723 @staticmethod 724 def create_session_id(username, password): 725 """ 726 Generate a filename for the session file 727 728 This is a combination of username and password, but hashed 729 so that one cannot actually derive someone's information. 730 731 :param str username: Username for Bluesky 732 :param str password: Password for Bluesky 733 :return str: A hash value derived from the input 734 """ 735 hash_base = username.strip() + str(password).strip() 736 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
22class SearchBluesky(Search): 23 """ 24 Search for posts in Bluesky 25 """ 26 type = "bsky-search" # job ID 27 category = "Search" # category 28 title = "Bluesky Search" # title displayed in UI 29 description = "Collects Bluesky posts via its API." # description displayed in UI 30 extension = "ndjson" # extension of result file, used internally and in UI 31 is_local = False # Whether this datasource is locally scraped 32 is_static = False # Whether this datasource is still updated 33 34 config = { 35 "bsky-search.max_results": { 36 "type": UserInput.OPTION_TEXT, 37 "help": "Maximum results per query", 38 "coerce_type": int, 39 "min": 0, 40 "default": 50000, 41 "tooltip": "Amount of results (e.g., posts) per query. '0' will allow unlimited." 42 } 43 } 44 45 handle_lookup_error_messages = ['account is deactivated', "profile not found", "account has been suspended"] 46 47 @classmethod 48 def get_options(cls, parent_dataset=None, config=None): 49 """ 50 Get processor options 51 52 Just updates the description of the entities field based on the 53 configured max entities. 54 55 :param DataSet parent_dataset: An object representing the dataset that 56 the processor would be run on 57 :param User user: Flask user the options will be displayed for, in 58 case they are requested for display in the 4CAT web interface. This can 59 be used to show some options only to privileges users. 60 """ 61 options = { 62 "intro": { 63 "type": UserInput.OPTION_INFO, 64 "help": "Collects Bluesky posts via its API.\n\nYour login credentials will be sent to the 4CAT server " 65 "and stored there while data is fetched. After the dataset has been created your credentials " 66 "will be deleted from the server. \n[See tips and tricks on how to query Bluesky](https://bsky.social/about/blog/05-31-2024-search)." 67 }, 68 "username": { 69 "type": UserInput.OPTION_TEXT, 70 "help": "Bluesky Username", 71 "cache": True, 72 "sensitive": True, 73 "tooltip": "If no server is specified, .bsky.social is used." 74 }, 75 "password": { 76 "type": UserInput.OPTION_TEXT, 77 "help": "Bluesky Password", 78 "cache": True, # tells the frontend to cache this value 79 "sensitive": True, # tells the backend to delete this value after use 80 "password": True, # tells the frontend this is a password type 81 }, 82 "divider": { 83 "type": UserInput.OPTION_DIVIDER 84 }, 85 "query": { 86 "type": UserInput.OPTION_TEXT_LARGE, 87 "help": "Search Queries", 88 "tooltip": "Separate with commas or line breaks." 89 }, 90 "max_posts": { 91 "type": UserInput.OPTION_TEXT, 92 "help": "Max posts per query", 93 "min": 1, 94 "default": 100 95 }, 96 "daterange": { 97 "type": UserInput.OPTION_DATERANGE, 98 "help": "Date range", 99 "tooltip": "The date range for the search. No date range will search all posts." 100 }, 101 } 102 103 # Update the max_posts setting from config 104 max_posts = int(config.get('bsky-search.max_results', default=100)) 105 if max_posts == 0: 106 # This is potentially madness 107 options["max_posts"]["tooltip"] = "Set to 0 to collect all posts." 108 options['max_posts']['min'] = 0 109 else: 110 options["max_posts"]["max"] = max_posts 111 options['max_posts']['default'] = options['max_posts']['default'] if options['max_posts']['default'] <= max_posts else max_posts 112 113 return options 114 115 @staticmethod 116 def validate_query(query, request, config): 117 """ 118 Validate Bluesky query 119 120 :param dict query: Query parameters, from client-side. 121 :param request: Flask request 122 :param User user: User object of user who has submitted the query 123 :return dict: Safe query parameters 124 """ 125 # no query 4 u 126 if not query.get("query", "").strip(): 127 raise QueryParametersException("You must provide a search query.") 128 129 if not query.get("username", None) or not query.get("password", None) : 130 raise QueryParametersException("You need to provide valid Bluesky login credentials first.") 131 132 # If no server is specified, default to .bsky.social 133 if "." not in query.get("username"): 134 query["username"] += ".bsky.social" 135 # Remove @ at the start 136 if query.get("username").startswith("@"): 137 query["username"] = query["username"][1:] 138 139 # Test login credentials 140 session_id = SearchBluesky.create_session_id(query["username"], query["password"]) 141 try: 142 SearchBluesky.bsky_login(username=query["username"], password=query["password"], session_id=session_id, session_directory=config.get("PATH_ROOT").joinpath(config.get("PATH_SESSIONS"))) 143 except UnauthorizedError: 144 raise QueryParametersException("Invalid Bluesky login credentials.") 145 except RequestException as e: 146 if e.response.content.message == 'Rate Limit Exceeded': 147 lifted_at = datetime.fromtimestamp(int(e.response.headers["ratelimit-reset"])) 148 raise QueryParametersException(f"Bluesky rate limit exceeded. Try again after {lifted_at.strftime('%Y-%m-%d %H:%M:%S')}.") 149 else: 150 raise QueryParametersException(f"Bluesky login failed. {e.response.content.message}") 151 152 # sanitize query 153 sanitized_query = [q.strip() for q in query.get("query").replace("\n", ",").split(",") if q.strip()] 154 155 # the dates need to make sense as a range to search within 156 min_date, max_date = query.get("daterange") 157 if min_date and max_date and min_date > max_date: 158 raise QueryParametersException("The start date must be before the end date.") 159 160 # Only check this if not already confirmed by the frontend 161 posts_per_second = 55 # gathered from simply checking start/end times of logs 162 if not query.get("frontend-confirm"): 163 # Estimate is not returned; use max_posts as a rough estimate 164 max_posts = query.get("max_posts", 100) 165 expected_tweets = query.get("max_posts", 100) * len(sanitized_query) 166 # Warn if process may take more than ~1 hours 167 if expected_tweets > (posts_per_second * 3600): 168 expected_time = timify(expected_tweets / posts_per_second) 169 raise QueryNeedsExplicitConfirmationException(f"This query matches approximately {expected_tweets} tweets and may take {expected_time} to complete. Do you want to continue?") 170 elif max_posts == 0 and not min_date: 171 raise QueryNeedsExplicitConfirmationException("No maximum number of posts set! This query has no minimum date and thus may take a very, very long time to complete. Do you want to continue?") 172 elif max_posts == 0: 173 raise QueryNeedsExplicitConfirmationException("No maximum number of posts set! This query may take a long time to complete. Do you want to continue?") 174 175 return { 176 "max_posts": query.get("max_posts"), 177 "query": ",".join(sanitized_query), 178 "username": query.get("username"), 179 "password": query.get("password"), 180 "session_id": session_id, 181 "min_date": min_date, 182 "max_date": max_date, 183 } 184 185 def get_items(self, query): 186 """ 187 Execute a query; get messages for given parameters 188 189 Basically a wrapper around execute_queries() to call it with asyncio. 190 191 :param dict query: Query parameters, as part of the DataSet object 192 :return list: Posts, sorted by thread and post ID, in ascending order 193 """ 194 if not query.get("session_id") and (not query.get("username") or not query.get("password")): 195 return self.dataset.finish_with_error("Your Bluesky login credentials are no longer available in 4CAT; please re-create this datasource.") 196 197 session_id = SearchBluesky.create_session_id(query.get("username"), query.get("password")) if not query.get("session_id") else query["session_id"] 198 try: 199 client = SearchBluesky.bsky_login(username=query.get("username"), password=query.get("password"), session_id=session_id, session_directory=self.config.get("PATH_ROOT").joinpath(self.config.get("PATH_SESSIONS"))) 200 except (UnauthorizedError, RequestException, BadRequestError) as e: 201 self.dataset.log(f"Bluesky login failed: {e}") 202 return self.dataset.finish_with_error("Bluesky login failed; please re-create this datasource.") 203 204 self.dataset.update_status(f"Collecting posts from Bluesky as {client.me.handle}") 205 206 max_posts = query.get("max_posts", 100) 207 limit = 100 if (max_posts > 100 or max_posts == 0) else max_posts 208 209 # Handle reference mapping; user references use did instead of dynamic handle 210 did_to_handle = {} 211 212 query_parameters = { 213 "limit": limit, 214 } 215 216 # Add start and end dates if provided 217 if self.parameters.get("min_date"): 218 query_parameters['since'] = datetime.fromtimestamp(self.parameters.get("min_date")).strftime('%Y-%m-%dT%H:%M:%SZ') 219 if self.parameters.get("max_date"): 220 query_parameters['until'] = datetime.fromtimestamp(self.parameters.get("max_date")).strftime('%Y-%m-%dT%H:%M:%SZ') 221 222 queries = query.get("query").split(",") 223 num_queries = len(queries) 224 total_posts = 0 225 i = 0 226 last_query = None 227 last_date = None 228 while queries: 229 query = queries.pop(0) 230 if query == last_query: 231 # Check if there are continued posts from the last query 232 query_parameters['until'] = last_date.strftime('%Y-%m-%dT%H:%M:%SZ') 233 self.dataset.log(f"Continuing query ({i} of {num_queries}): {query} from {last_date.strftime('%Y-%m-%dT%H:%M:%SZ')}") 234 else: 235 # New query 236 query_post_ids = set() 237 i += 1 238 rank = 0 239 last_query = query 240 last_date = None 241 self.dataset.update_status(f"Collecting query ({i} of {num_queries}): {query}") 242 query_requests = 0 243 244 query_parameters["q"] = query 245 cursor = None # Start with no cursor (first page) 246 search_for_invalid_post = False 247 invalid_post_counter = 0 248 while True: 249 if self.interrupted: 250 raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API") 251 # Query posts, including pagination (cursor for next page) 252 tries = 0 253 response = None 254 while tries < 3: 255 query_parameters["cursor"] = cursor 256 try: 257 response = client.app.bsky.feed.search_posts(params=query_parameters) 258 break 259 except ModelError as e: 260 # Post validation error; one post is unable to be read 261 # Pattern: some invalid post raises error, we switch from higher limit (usually 100) to 1 in 262 # order to collect post by post, invalid post is identified again, we switch back to higher 263 # limit and continue as normal, at the "end" of a cursor/query life (~10k posts) a NetworkError 264 # is raised with detail refering to a server error 502 InternalServerError, we catch that and 265 # add the query back to the queue with a new "until" date to continue the query 266 # https://github.com/bluesky-social/atproto/issues/3446 267 if not search_for_invalid_post: 268 # New invalid post, search and skip 269 self.dataset.log(f"Invalid post detected; searching post by post: {e}") 270 search_for_invalid_post = True 271 # Currently we must search post by post to find the invalid post 272 query_parameters["limit"] = 1 273 else: 274 # Found invalid post, skip, reset counters 275 self.dataset.log( 276 f"Invalid post identified; skipping and continue with query as normal: {e}") 277 search_for_invalid_post = False 278 # Reset limit to normal 279 query_parameters["limit"] = limit 280 invalid_post_counter = 0 281 cursor = str(int(cursor) + 1) if cursor else None 282 # Re-query with new cursor & limit 283 continue 284 285 except InvokeTimeoutError as e: 286 # Timeout error, but can occur for odd queries with no results 287 self.dataset.log(f"Bluesky request error for query {query}: {e}") 288 time.sleep(1) 289 tries += 2 290 continue 291 except NetworkError as e: 292 # 502 InternalServerError: occurs if switch limits in a "set" (i.e. the vague 10k posts cursor limit), I seem to get this error around the 10k mark instead of just a missing cursor as normal 293 self.dataset.log(f"Bluesky network error for query {query}; retrying: {e}") 294 time.sleep(1 + (tries * 10)) 295 queries.insert(0, query) 296 break 297 298 if not response: 299 # Expected from NetworkError, but the query will have been added back to the queue 300 # If not, then there was a problem with the query 301 if len(queries) == 0: 302 self.dataset.update_status("Error collecting posts from Bluesky; see log for details", is_final=True) 303 if query not in queries: 304 # Query was not added back; there was an unexpected issue with the query itself 305 self.dataset.update_status(f"Error continuing {query} from Bluesky (see log for details); continuing to next query") 306 break 307 308 query_requests += 1 309 items = response['posts'] if hasattr(response, 'posts') else [] 310 311 if search_for_invalid_post: 312 invalid_post_counter += 1 313 if invalid_post_counter >= 100: 314 # Max limit is 100; this should not occur, but we do not want to continue searching post by post indefinitely 315 self.dataset.log("Unable to identify invalid post; discontinuing search") 316 query_parameters["limit"] = limit 317 search_for_invalid_post = False 318 invalid_post_counter = 0 319 320 if not items: 321 # Sometimes no post is returned, but there still may be posts following 322 self.dataset.log(f"Query {query} w/ params {query_parameters} returned no posts: {response}") 323 # TODO: this is odd; no information is returned as to why that one item is not returned and no error is raised 324 cursor = str(int(cursor) + 1) if cursor else None 325 continue 326 327 new_posts = 0 328 # Handle the posts 329 for item in items: 330 if 0 < max_posts <= rank: 331 break 332 333 if self.interrupted: 334 raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API") 335 336 post = item.model_dump() 337 post_id = post["uri"] 338 # Queries use the indexed_at date for time-based pagination (as opposed to created_at); used to continue query if needed 339 last_date = SearchBluesky.bsky_convert_datetime_string(post.get("indexed_at")) 340 if post_id in query_post_ids: 341 # Skip duplicate posts 342 continue 343 344 new_posts += 1 345 query_post_ids.add(post_id) 346 347 # Add user handles from references 348 did_to_handle[post["author"]["did"]] = post["author"]["handle"] 349 # Mentions 350 mentions = [] 351 if post["record"].get("facets"): 352 for facet in post["record"]["facets"]: 353 for feature in facet.get("features", {}): 354 if feature.get("did"): 355 if feature["did"] in did_to_handle: 356 mentions.append({"did": feature["did"], "handle": did_to_handle[feature["did"]]}) 357 else: 358 handle = SearchBluesky.bsky_get_handle_from_did(client, feature["did"]) 359 if handle: 360 if handle.lower() in self.handle_lookup_error_messages: 361 self.dataset.log(f"Bluesky: user ({feature['did']}) {handle}") 362 mentions.append({"did": feature["did"], "handle": handle}) 363 did_to_handle[feature["did"]] = handle 364 else: 365 mentions.append({"did": feature["did"], "handle": None}) 366 self.dataset.log(f"Bluesky: could not lookup the handle for {feature['did']}") 367 # Reply to 368 reply_to_handle = None 369 if post["record"].get("reply"): 370 reply_to_did = post["record"]["reply"]["parent"]["uri"].split("/")[2] 371 if reply_to_did in did_to_handle: 372 reply_to_handle = did_to_handle[reply_to_did] 373 else: 374 handle = SearchBluesky.bsky_get_handle_from_did(client, reply_to_did) 375 if handle: 376 if handle.lower() in self.handle_lookup_error_messages: 377 self.dataset.log(f"Bluesky: user ({reply_to_did}) {handle}") 378 reply_to_handle = handle 379 did_to_handle[reply_to_did] = handle 380 else: 381 self.dataset.log(f"Bluesky: could not find handle for {reply_to_did}") 382 383 384 post.update({"4CAT_metadata": { 385 "collected_at": datetime.now().timestamp(), 386 "query": query, 387 "rank": rank, 388 "mentions": mentions, 389 "reply_to": reply_to_handle if reply_to_handle else None, 390 }}) 391 rank += 1 392 yield post 393 total_posts += 1 394 395 # Check if there is a cursor for the next page 396 cursor = response['cursor'] 397 if max_posts != 0 and rank % (max_posts // 10) == 0: 398 self.dataset.update_status(f"Progress query {query}: {rank} posts collected out of {max_posts}") 399 self.dataset.update_progress(total_posts / (max_posts * num_queries)) 400 elif max_posts == 0 and rank % 1000 == 0: 401 self.dataset.update_status(f"Progress query {query}: {rank} posts collected") 402 403 if 0 < max_posts <= rank: 404 self.dataset.update_status( 405 f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}") 406 break 407 408 if not cursor: 409 if new_posts: 410 # Bluesky API seems to stop around 10000 posts and not return a cursor 411 # Re-query with the same query to get the next set of posts using last_date (set above) 412 self.dataset.log(f"Query {query}: {query_requests} requests") 413 queries.insert(0, query) 414 else: 415 # No new posts; if we have not hit the max_posts, but no new posts are being returned, then we are done 416 self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned") 417 418 if rank: 419 self.dataset.update_status(f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}") 420 break # No more pages, stop the loop 421 elif not items: 422 self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned") 423 break 424 425 @staticmethod 426 def map_item(item): 427 """ 428 Convert item object to 4CAT-ready data object 429 430 :param dict item: item to parse 431 :return dict: 4CAT-compatible item object 432 """ 433 unmapped_data = [] 434 435 # Add link to post; this is a Bluesky-specific URL and may not always be accurate 436 link = SearchBluesky.get_bsky_link(item['author']['handle'], item['uri'].split('/')[-1]) 437 author_profile = f"https://bsky.app/profile/{item['author']['handle']}" 438 439 created_at = SearchBluesky.bsky_convert_datetime_string(item["record"].get("created_at",item["record"].get("createdAt"))) 440 441 # Tags 442 tags = set() 443 links = set() 444 mentions_did = set() 445 has_poll = False 446 if item["record"].get("facets"): 447 for facet in item["record"].get("facets"): 448 for feature in facet.get("features"): 449 if feature.get("tag"): 450 tags.add(feature.get("tag")) 451 elif feature.get("uri"): 452 links.add(feature.get("uri")) 453 elif feature.get("did"): 454 mentions_did.add(feature.get("did")) 455 elif feature.get("number"): 456 has_poll = True 457 else: 458 unmapped_data.append({"loc": "record.facets.features", "obj": feature}) 459 if "features" not in facet: 460 unmapped_data.append({"loc": "record.facets", "obj": facet}) 461 462 # Embeds are in both the item and the record; so far these always contain same item 463 embeded_links = set() 464 embeded_images = set() 465 image_references = set() 466 quoted_link = None 467 quoted_user = None 468 quoted_ref = None 469 possible_embeds = [item.get("embed", {}), item["record"].get("embed", {})] 470 while possible_embeds: 471 embed = possible_embeds.pop(0) 472 if not embed: 473 continue 474 475 py_type = embed.pop("py_type") if "py_type" in embed else (embed.pop("$type") if "$type" in embed else None) 476 if py_type in ["app.bsky.embed.recordWithMedia#view", "app.bsky.embed.recordWithMedia"]: 477 # contains post plus additional media 478 for key, media_ob in embed.items(): 479 possible_embeds.append(media_ob) 480 481 elif "images" in embed: # py_type in ["app.bsky.embed.images#view", "app.bsky.embed.images", "app.bsky.embed.images#main"] 482 for img_ob in embed["images"]: 483 img_link = img_ob.get("fullsize", img_ob.get("thumb")) 484 if img_link: 485 embeded_images.add(img_link) 486 elif img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")): 487 # ob.get("image").get("ref").get("link") will have a reference that could be linked via API 488 # BUT ref has already been obtained in other embeds... 489 image_references.add(img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link"))) 490 else: 491 unmapped_data.append({"loc": "embed.images", "obj": img_ob}) 492 elif py_type in ["app.bsky.embed.video#view", "app.bsky.embed.video"]: 493 # Does not appear to be direct video links, just thumbnail (Bluesky API may be able to get more) 494 if embed.get("thumbnail"): 495 embeded_images.add(embed.get("thumbnail")) 496 elif embed.get("video", {}).get("ref", {}).get("link"): 497 image_references.add(embed.get("video", {}).get("ref", {}).get("link")) 498 else: 499 # No thumb for video 500 pass 501 elif "record" in embed: # py_type in ["app.bsky.embed.record#view", "app.bsky.embed.record"] 502 # Quoted post 503 # Note: these may also contain images that would be seen, but are not part of the original post 504 if embed["record"].get("author", embed["record"].get("creator")): 505 if "handle" not in embed["record"].get("author", embed["record"].get("creator")): 506 # User may not be able to see original post 507 if "app.bsky.feed.defs#blockedAuthor" == embed["record"].get("author", embed["record"].get("creator"))["py_type"]: 508 quoted_link = "VIEWER BLOCKED BY AUTHOR" 509 else: 510 # New unknown 511 unmapped_data.append({"loc": "embed.record.author", "obj": embed["record"].get("author", embed["record"].get("creator"))}) 512 else: 513 # author seems to be a quoted post while creator a quoted "list" 514 quoted_user = embed["record"].get("author", embed["record"].get("creator"))["handle"] 515 quoted_link = SearchBluesky.get_bsky_link(quoted_user, embed['record']['uri'].split('/')[-1]) 516 elif embed["record"].get("not_found"): 517 quoted_link = "DELETED" 518 # We do have the DID, but this information is not normally displayed 519 # quoted_user = embed["record"]['uri'].split("/")[2] 520 elif embed["record"].get("detached"): 521 quoted_link = "REMOVED BY AUTHOR" 522 else: 523 quoted_ref = embed["record"]['uri'] 524 elif "external" in embed: # py_type in ["app.bsky.embed.external#view", "app.bsky.embed.external"] 525 if embed["external"].get("uri"): 526 embeded_links.add(embed["external"].get("uri")) 527 if embed["external"].get("thumb"): 528 if isinstance(embed["external"]["thumb"], str): 529 embeded_images.add(embed["external"]["thumb"]) 530 else: 531 image_references.add(embed["external"]["thumb"].get("ref", {}).get("link", "")) 532 else: 533 # No thumb for link 534 pass 535 else: 536 unmapped_data.append({"loc": f"embed.{py_type}", 537 "obj": embed}) 538 539 # Replies allowed 540 # https://docs.bsky.app/docs/tutorials/thread-gates 541 # threadgate object does not appear to differentiate between the types of replies allowed 542 replies_allowed = True if not item["threadgate"] else False 543 544 # Labels (content moderation) 545 labels = set() if not item["labels"] else set([label.get("val") for label in item["labels"]]) 546 if item["record"].get("labels"): 547 labels = labels | set([label.get("val") for label in item["record"]["labels"].get("values",[])]) 548 549 # Language 550 languages = "N/A" if not item["record"].get("langs") else ",".join(item["record"].get("langs")) 551 552 # Missing references 553 if any([ref for ref in image_references if ref not in "".join(embeded_images)]): 554 unmapped_data.append({"loc": "missing_image_refs", "obj": [ref for ref in image_references if ref not in "".join(embeded_images)]}) 555 if quoted_ref: 556 if not quoted_link or (quoted_link not in ["DELETED", "REMOVED BY AUTHOR", "VIEWER BLOCKED BY AUTHOR"] and quoted_ref.split('/')[-1] not in quoted_link): 557 unmapped_data.append({"loc": "missing_quote_ref", "obj": quoted_ref}) 558 559 # Reference Posts (expanded to include handles during collection) 560 # None: handles may change; true DID from original object stored item["record"]["facets"]["features"] w/ "did" 561 # Mentions 562 mentions = [(mention.get("handle") if (mention.get("handle") and mention["handle"].lower() not in SearchBluesky.handle_lookup_error_messages) else mention.get("did")) for mention in item.get("4CAT_metadata", {}).get("mentions", [])] 563 # Reply to 564 replied_to_post = None 565 replied_to_user = None 566 if item["record"].get("reply"): 567 if item["4CAT_metadata"]["reply_to"] and item["4CAT_metadata"]["reply_to"].lower() not in SearchBluesky.handle_lookup_error_messages: 568 replied_to_user = item["4CAT_metadata"]["reply_to"] 569 else: 570 # Use DID, though this will not create a working link 571 replied_to_user = item["record"]["reply"]["parent"]["uri"].split("/")[2] 572 replied_to_post = SearchBluesky.get_bsky_link(replied_to_user, item["record"]["reply"]["parent"]["uri"].split("/")[-1]) 573 574 # These refer to slices of the text, but are also contained in the text or as an embed. If they are NOT also in the text and/or embed fields, then they are NOT displayed in bsky.app UI and thus only metadata 575 # if item["record"].get("entities"): 576 # unmapped_data.append({"loc": "record.entities", "obj": item["record"]["entities"]}) 577 578 # Author tags, not hashtags, not seen, very rarely used 579 # if item["record"].get("tags"): 580 # unmapped_data.append({"loc": "record.tags", "obj": item["record"].get("tags")}) 581 582 return MappedItem({ 583 "collected_at": datetime.fromtimestamp(item["4CAT_metadata"]["collected_at"]).isoformat(), 584 "query": item["4CAT_metadata"]["query"], 585 "rank": item["4CAT_metadata"]["rank"], 586 "id": item["uri"], 587 "thread_id": item["record"]["reply"]["root"]["uri"] if item["record"].get("reply") else item["uri"], 588 "created_at": created_at.isoformat(), 589 "author": item["author"]["handle"], 590 "author_id": item["author"]["did"], 591 "body": item["record"]["text"], 592 "link": link, 593 "tags": ",".join(tags), 594 "like_count": item["like_count"], 595 "quote_count": item["quote_count"], 596 "reply_count": item["reply_count"], 597 "repost_count": item["repost_count"], 598 "quoted_post": quoted_link if quoted_link else "", 599 "quoted_user": quoted_user if quoted_user else "", 600 "replied_to_post": replied_to_post if replied_to_post else "", 601 "replied_to_user": replied_to_user if replied_to_user else "", 602 "replies_allowed": replies_allowed, 603 "mentions": ",".join(mentions), 604 "links": ",".join(embeded_links | links), 605 "images": ",".join(embeded_images), 606 "labels": ",".join(labels), 607 "has_poll": has_poll, 608 "languages": languages, 609 610 "author_display_name": item["author"]["display_name"], 611 "author_profile": author_profile, 612 "author_avatar": item["author"]["avatar"], 613 "author_created_at": SearchBluesky.bsky_convert_datetime_string(item["author"]["created_at"], mode="iso_string", raise_error=False), 614 615 "timestamp": int(created_at.timestamp()), 616 }, message=f"Bluesky new mappings: {unmapped_data}") 617 618 @staticmethod 619 def bsky_convert_datetime_string(datetime_string, mode="datetime", raise_error=True): 620 """ 621 Bluesky datetime string to datetime object. 622 623 Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string. 624 625 :param str datetime_string: The datetime string to convert 626 :param str mode: The mode to return the datetime object in [datetime, iso_string] 627 :param bool raise_error: Raise error if unable to parse else return datetime_string 628 :return datetime/str: The converted datetime object 629 """ 630 try: 631 datetime_object = parser.isoparse(datetime_string) 632 except ValueError as e: 633 if raise_error: 634 raise e 635 return datetime_string 636 637 if mode == "datetime": 638 return datetime_object 639 elif mode == "iso_string": 640 return datetime_object.isoformat() 641 642 643 @staticmethod 644 def get_bsky_link(handle, post_id): 645 """ 646 Get link to Bluesky post 647 """ 648 return f"https://bsky.app/profile/{handle}/post/{post_id}" 649 650 @staticmethod 651 def bsky_get_handle_from_did(client, did): 652 """ 653 Get handle from DID 654 """ 655 tries = 0 656 while True: 657 try: 658 user_profile = client.app.bsky.actor.get_profile({"actor": did}) 659 if user_profile: 660 return user_profile.handle 661 else: 662 return None 663 except (NetworkError, InvokeTimeoutError): 664 # Network error; try again 665 tries += 1 666 time.sleep(1) 667 if tries > 3: 668 return None 669 continue 670 except BadRequestError as e: 671 if e.response.content.message: 672 return e.response.content.message 673 return None 674 675 @staticmethod 676 def bsky_login(username, password, session_id, session_directory): 677 """ 678 Login to Bluesky 679 680 :param str username: Username for Bluesky 681 :param str password: Password for Bluesky 682 :param str session_id: Session ID to use for login 683 :param Path session_directory: Directory to save the session file 684 :return Client: Client object with login credentials 685 """ 686 if not session_id: 687 session_id = SearchBluesky.create_session_id(username, password) 688 elif (not username or not password) and not session_id: 689 raise ValueError("Must provide both username and password or else session_id.") 690 691 session_path = session_directory.joinpath("bsky_" + session_id + ".session") 692 693 def on_session_change(event: SessionEvent, session: Session) -> None: 694 """ 695 Save session to file; atproto session change event handler should keep the session up to date 696 697 https://atproto.blue/en/latest/atproto_client/auth.html 698 """ 699 print('Session changed:', event, repr(session)) 700 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 701 print('Saving changed session') 702 with session_path.open("w") as session_file: 703 session_file.write(session.export()) 704 705 client = Client() 706 client.on_session_change(on_session_change) 707 if session_path.exists(): 708 with session_path.open() as session_file: 709 session_string = session_file.read() 710 try: 711 client.login(session_string=session_string) 712 except BadRequestError as e: 713 if e.response.content.message == 'Token has expired': 714 # Token has expired; try to refresh 715 if username and password: 716 client.login(login=username, password=password) 717 else: 718 raise ValueError("Session token has expired; please re-login with username and password.") 719 else: 720 # Were not able to log in via session string; login with username and password 721 client.login(login=username, password=password) 722 return client 723 724 @staticmethod 725 def create_session_id(username, password): 726 """ 727 Generate a filename for the session file 728 729 This is a combination of username and password, but hashed 730 so that one cannot actually derive someone's information. 731 732 :param str username: Username for Bluesky 733 :param str password: Password for Bluesky 734 :return str: A hash value derived from the input 735 """ 736 hash_base = username.strip() + str(password).strip() 737 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
Search for posts in Bluesky
47 @classmethod 48 def get_options(cls, parent_dataset=None, config=None): 49 """ 50 Get processor options 51 52 Just updates the description of the entities field based on the 53 configured max entities. 54 55 :param DataSet parent_dataset: An object representing the dataset that 56 the processor would be run on 57 :param User user: Flask user the options will be displayed for, in 58 case they are requested for display in the 4CAT web interface. This can 59 be used to show some options only to privileges users. 60 """ 61 options = { 62 "intro": { 63 "type": UserInput.OPTION_INFO, 64 "help": "Collects Bluesky posts via its API.\n\nYour login credentials will be sent to the 4CAT server " 65 "and stored there while data is fetched. After the dataset has been created your credentials " 66 "will be deleted from the server. \n[See tips and tricks on how to query Bluesky](https://bsky.social/about/blog/05-31-2024-search)." 67 }, 68 "username": { 69 "type": UserInput.OPTION_TEXT, 70 "help": "Bluesky Username", 71 "cache": True, 72 "sensitive": True, 73 "tooltip": "If no server is specified, .bsky.social is used." 74 }, 75 "password": { 76 "type": UserInput.OPTION_TEXT, 77 "help": "Bluesky Password", 78 "cache": True, # tells the frontend to cache this value 79 "sensitive": True, # tells the backend to delete this value after use 80 "password": True, # tells the frontend this is a password type 81 }, 82 "divider": { 83 "type": UserInput.OPTION_DIVIDER 84 }, 85 "query": { 86 "type": UserInput.OPTION_TEXT_LARGE, 87 "help": "Search Queries", 88 "tooltip": "Separate with commas or line breaks." 89 }, 90 "max_posts": { 91 "type": UserInput.OPTION_TEXT, 92 "help": "Max posts per query", 93 "min": 1, 94 "default": 100 95 }, 96 "daterange": { 97 "type": UserInput.OPTION_DATERANGE, 98 "help": "Date range", 99 "tooltip": "The date range for the search. No date range will search all posts." 100 }, 101 } 102 103 # Update the max_posts setting from config 104 max_posts = int(config.get('bsky-search.max_results', default=100)) 105 if max_posts == 0: 106 # This is potentially madness 107 options["max_posts"]["tooltip"] = "Set to 0 to collect all posts." 108 options['max_posts']['min'] = 0 109 else: 110 options["max_posts"]["max"] = max_posts 111 options['max_posts']['default'] = options['max_posts']['default'] if options['max_posts']['default'] <= max_posts else max_posts 112 113 return options
Get processor options
Just updates the description of the entities field based on the configured max entities.
Parameters
- DataSet parent_dataset: An object representing the dataset that the processor would be run on
- User user: Flask user the options will be displayed for, in case they are requested for display in the 4CAT web interface. This can be used to show some options only to privileges users.
115 @staticmethod 116 def validate_query(query, request, config): 117 """ 118 Validate Bluesky query 119 120 :param dict query: Query parameters, from client-side. 121 :param request: Flask request 122 :param User user: User object of user who has submitted the query 123 :return dict: Safe query parameters 124 """ 125 # no query 4 u 126 if not query.get("query", "").strip(): 127 raise QueryParametersException("You must provide a search query.") 128 129 if not query.get("username", None) or not query.get("password", None) : 130 raise QueryParametersException("You need to provide valid Bluesky login credentials first.") 131 132 # If no server is specified, default to .bsky.social 133 if "." not in query.get("username"): 134 query["username"] += ".bsky.social" 135 # Remove @ at the start 136 if query.get("username").startswith("@"): 137 query["username"] = query["username"][1:] 138 139 # Test login credentials 140 session_id = SearchBluesky.create_session_id(query["username"], query["password"]) 141 try: 142 SearchBluesky.bsky_login(username=query["username"], password=query["password"], session_id=session_id, session_directory=config.get("PATH_ROOT").joinpath(config.get("PATH_SESSIONS"))) 143 except UnauthorizedError: 144 raise QueryParametersException("Invalid Bluesky login credentials.") 145 except RequestException as e: 146 if e.response.content.message == 'Rate Limit Exceeded': 147 lifted_at = datetime.fromtimestamp(int(e.response.headers["ratelimit-reset"])) 148 raise QueryParametersException(f"Bluesky rate limit exceeded. Try again after {lifted_at.strftime('%Y-%m-%d %H:%M:%S')}.") 149 else: 150 raise QueryParametersException(f"Bluesky login failed. {e.response.content.message}") 151 152 # sanitize query 153 sanitized_query = [q.strip() for q in query.get("query").replace("\n", ",").split(",") if q.strip()] 154 155 # the dates need to make sense as a range to search within 156 min_date, max_date = query.get("daterange") 157 if min_date and max_date and min_date > max_date: 158 raise QueryParametersException("The start date must be before the end date.") 159 160 # Only check this if not already confirmed by the frontend 161 posts_per_second = 55 # gathered from simply checking start/end times of logs 162 if not query.get("frontend-confirm"): 163 # Estimate is not returned; use max_posts as a rough estimate 164 max_posts = query.get("max_posts", 100) 165 expected_tweets = query.get("max_posts", 100) * len(sanitized_query) 166 # Warn if process may take more than ~1 hours 167 if expected_tweets > (posts_per_second * 3600): 168 expected_time = timify(expected_tweets / posts_per_second) 169 raise QueryNeedsExplicitConfirmationException(f"This query matches approximately {expected_tweets} tweets and may take {expected_time} to complete. Do you want to continue?") 170 elif max_posts == 0 and not min_date: 171 raise QueryNeedsExplicitConfirmationException("No maximum number of posts set! This query has no minimum date and thus may take a very, very long time to complete. Do you want to continue?") 172 elif max_posts == 0: 173 raise QueryNeedsExplicitConfirmationException("No maximum number of posts set! This query may take a long time to complete. Do you want to continue?") 174 175 return { 176 "max_posts": query.get("max_posts"), 177 "query": ",".join(sanitized_query), 178 "username": query.get("username"), 179 "password": query.get("password"), 180 "session_id": session_id, 181 "min_date": min_date, 182 "max_date": max_date, 183 }
Validate Bluesky query
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- User user: User object of user who has submitted the query
Returns
Safe query parameters
185 def get_items(self, query): 186 """ 187 Execute a query; get messages for given parameters 188 189 Basically a wrapper around execute_queries() to call it with asyncio. 190 191 :param dict query: Query parameters, as part of the DataSet object 192 :return list: Posts, sorted by thread and post ID, in ascending order 193 """ 194 if not query.get("session_id") and (not query.get("username") or not query.get("password")): 195 return self.dataset.finish_with_error("Your Bluesky login credentials are no longer available in 4CAT; please re-create this datasource.") 196 197 session_id = SearchBluesky.create_session_id(query.get("username"), query.get("password")) if not query.get("session_id") else query["session_id"] 198 try: 199 client = SearchBluesky.bsky_login(username=query.get("username"), password=query.get("password"), session_id=session_id, session_directory=self.config.get("PATH_ROOT").joinpath(self.config.get("PATH_SESSIONS"))) 200 except (UnauthorizedError, RequestException, BadRequestError) as e: 201 self.dataset.log(f"Bluesky login failed: {e}") 202 return self.dataset.finish_with_error("Bluesky login failed; please re-create this datasource.") 203 204 self.dataset.update_status(f"Collecting posts from Bluesky as {client.me.handle}") 205 206 max_posts = query.get("max_posts", 100) 207 limit = 100 if (max_posts > 100 or max_posts == 0) else max_posts 208 209 # Handle reference mapping; user references use did instead of dynamic handle 210 did_to_handle = {} 211 212 query_parameters = { 213 "limit": limit, 214 } 215 216 # Add start and end dates if provided 217 if self.parameters.get("min_date"): 218 query_parameters['since'] = datetime.fromtimestamp(self.parameters.get("min_date")).strftime('%Y-%m-%dT%H:%M:%SZ') 219 if self.parameters.get("max_date"): 220 query_parameters['until'] = datetime.fromtimestamp(self.parameters.get("max_date")).strftime('%Y-%m-%dT%H:%M:%SZ') 221 222 queries = query.get("query").split(",") 223 num_queries = len(queries) 224 total_posts = 0 225 i = 0 226 last_query = None 227 last_date = None 228 while queries: 229 query = queries.pop(0) 230 if query == last_query: 231 # Check if there are continued posts from the last query 232 query_parameters['until'] = last_date.strftime('%Y-%m-%dT%H:%M:%SZ') 233 self.dataset.log(f"Continuing query ({i} of {num_queries}): {query} from {last_date.strftime('%Y-%m-%dT%H:%M:%SZ')}") 234 else: 235 # New query 236 query_post_ids = set() 237 i += 1 238 rank = 0 239 last_query = query 240 last_date = None 241 self.dataset.update_status(f"Collecting query ({i} of {num_queries}): {query}") 242 query_requests = 0 243 244 query_parameters["q"] = query 245 cursor = None # Start with no cursor (first page) 246 search_for_invalid_post = False 247 invalid_post_counter = 0 248 while True: 249 if self.interrupted: 250 raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API") 251 # Query posts, including pagination (cursor for next page) 252 tries = 0 253 response = None 254 while tries < 3: 255 query_parameters["cursor"] = cursor 256 try: 257 response = client.app.bsky.feed.search_posts(params=query_parameters) 258 break 259 except ModelError as e: 260 # Post validation error; one post is unable to be read 261 # Pattern: some invalid post raises error, we switch from higher limit (usually 100) to 1 in 262 # order to collect post by post, invalid post is identified again, we switch back to higher 263 # limit and continue as normal, at the "end" of a cursor/query life (~10k posts) a NetworkError 264 # is raised with detail refering to a server error 502 InternalServerError, we catch that and 265 # add the query back to the queue with a new "until" date to continue the query 266 # https://github.com/bluesky-social/atproto/issues/3446 267 if not search_for_invalid_post: 268 # New invalid post, search and skip 269 self.dataset.log(f"Invalid post detected; searching post by post: {e}") 270 search_for_invalid_post = True 271 # Currently we must search post by post to find the invalid post 272 query_parameters["limit"] = 1 273 else: 274 # Found invalid post, skip, reset counters 275 self.dataset.log( 276 f"Invalid post identified; skipping and continue with query as normal: {e}") 277 search_for_invalid_post = False 278 # Reset limit to normal 279 query_parameters["limit"] = limit 280 invalid_post_counter = 0 281 cursor = str(int(cursor) + 1) if cursor else None 282 # Re-query with new cursor & limit 283 continue 284 285 except InvokeTimeoutError as e: 286 # Timeout error, but can occur for odd queries with no results 287 self.dataset.log(f"Bluesky request error for query {query}: {e}") 288 time.sleep(1) 289 tries += 2 290 continue 291 except NetworkError as e: 292 # 502 InternalServerError: occurs if switch limits in a "set" (i.e. the vague 10k posts cursor limit), I seem to get this error around the 10k mark instead of just a missing cursor as normal 293 self.dataset.log(f"Bluesky network error for query {query}; retrying: {e}") 294 time.sleep(1 + (tries * 10)) 295 queries.insert(0, query) 296 break 297 298 if not response: 299 # Expected from NetworkError, but the query will have been added back to the queue 300 # If not, then there was a problem with the query 301 if len(queries) == 0: 302 self.dataset.update_status("Error collecting posts from Bluesky; see log for details", is_final=True) 303 if query not in queries: 304 # Query was not added back; there was an unexpected issue with the query itself 305 self.dataset.update_status(f"Error continuing {query} from Bluesky (see log for details); continuing to next query") 306 break 307 308 query_requests += 1 309 items = response['posts'] if hasattr(response, 'posts') else [] 310 311 if search_for_invalid_post: 312 invalid_post_counter += 1 313 if invalid_post_counter >= 100: 314 # Max limit is 100; this should not occur, but we do not want to continue searching post by post indefinitely 315 self.dataset.log("Unable to identify invalid post; discontinuing search") 316 query_parameters["limit"] = limit 317 search_for_invalid_post = False 318 invalid_post_counter = 0 319 320 if not items: 321 # Sometimes no post is returned, but there still may be posts following 322 self.dataset.log(f"Query {query} w/ params {query_parameters} returned no posts: {response}") 323 # TODO: this is odd; no information is returned as to why that one item is not returned and no error is raised 324 cursor = str(int(cursor) + 1) if cursor else None 325 continue 326 327 new_posts = 0 328 # Handle the posts 329 for item in items: 330 if 0 < max_posts <= rank: 331 break 332 333 if self.interrupted: 334 raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API") 335 336 post = item.model_dump() 337 post_id = post["uri"] 338 # Queries use the indexed_at date for time-based pagination (as opposed to created_at); used to continue query if needed 339 last_date = SearchBluesky.bsky_convert_datetime_string(post.get("indexed_at")) 340 if post_id in query_post_ids: 341 # Skip duplicate posts 342 continue 343 344 new_posts += 1 345 query_post_ids.add(post_id) 346 347 # Add user handles from references 348 did_to_handle[post["author"]["did"]] = post["author"]["handle"] 349 # Mentions 350 mentions = [] 351 if post["record"].get("facets"): 352 for facet in post["record"]["facets"]: 353 for feature in facet.get("features", {}): 354 if feature.get("did"): 355 if feature["did"] in did_to_handle: 356 mentions.append({"did": feature["did"], "handle": did_to_handle[feature["did"]]}) 357 else: 358 handle = SearchBluesky.bsky_get_handle_from_did(client, feature["did"]) 359 if handle: 360 if handle.lower() in self.handle_lookup_error_messages: 361 self.dataset.log(f"Bluesky: user ({feature['did']}) {handle}") 362 mentions.append({"did": feature["did"], "handle": handle}) 363 did_to_handle[feature["did"]] = handle 364 else: 365 mentions.append({"did": feature["did"], "handle": None}) 366 self.dataset.log(f"Bluesky: could not lookup the handle for {feature['did']}") 367 # Reply to 368 reply_to_handle = None 369 if post["record"].get("reply"): 370 reply_to_did = post["record"]["reply"]["parent"]["uri"].split("/")[2] 371 if reply_to_did in did_to_handle: 372 reply_to_handle = did_to_handle[reply_to_did] 373 else: 374 handle = SearchBluesky.bsky_get_handle_from_did(client, reply_to_did) 375 if handle: 376 if handle.lower() in self.handle_lookup_error_messages: 377 self.dataset.log(f"Bluesky: user ({reply_to_did}) {handle}") 378 reply_to_handle = handle 379 did_to_handle[reply_to_did] = handle 380 else: 381 self.dataset.log(f"Bluesky: could not find handle for {reply_to_did}") 382 383 384 post.update({"4CAT_metadata": { 385 "collected_at": datetime.now().timestamp(), 386 "query": query, 387 "rank": rank, 388 "mentions": mentions, 389 "reply_to": reply_to_handle if reply_to_handle else None, 390 }}) 391 rank += 1 392 yield post 393 total_posts += 1 394 395 # Check if there is a cursor for the next page 396 cursor = response['cursor'] 397 if max_posts != 0 and rank % (max_posts // 10) == 0: 398 self.dataset.update_status(f"Progress query {query}: {rank} posts collected out of {max_posts}") 399 self.dataset.update_progress(total_posts / (max_posts * num_queries)) 400 elif max_posts == 0 and rank % 1000 == 0: 401 self.dataset.update_status(f"Progress query {query}: {rank} posts collected") 402 403 if 0 < max_posts <= rank: 404 self.dataset.update_status( 405 f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}") 406 break 407 408 if not cursor: 409 if new_posts: 410 # Bluesky API seems to stop around 10000 posts and not return a cursor 411 # Re-query with the same query to get the next set of posts using last_date (set above) 412 self.dataset.log(f"Query {query}: {query_requests} requests") 413 queries.insert(0, query) 414 else: 415 # No new posts; if we have not hit the max_posts, but no new posts are being returned, then we are done 416 self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned") 417 418 if rank: 419 self.dataset.update_status(f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}") 420 break # No more pages, stop the loop 421 elif not items: 422 self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned") 423 break
Execute a query; get messages for given parameters
Basically a wrapper around execute_queries() to call it with asyncio.
Parameters
- dict query: Query parameters, as part of the DataSet object
Returns
Posts, sorted by thread and post ID, in ascending order
425 @staticmethod 426 def map_item(item): 427 """ 428 Convert item object to 4CAT-ready data object 429 430 :param dict item: item to parse 431 :return dict: 4CAT-compatible item object 432 """ 433 unmapped_data = [] 434 435 # Add link to post; this is a Bluesky-specific URL and may not always be accurate 436 link = SearchBluesky.get_bsky_link(item['author']['handle'], item['uri'].split('/')[-1]) 437 author_profile = f"https://bsky.app/profile/{item['author']['handle']}" 438 439 created_at = SearchBluesky.bsky_convert_datetime_string(item["record"].get("created_at",item["record"].get("createdAt"))) 440 441 # Tags 442 tags = set() 443 links = set() 444 mentions_did = set() 445 has_poll = False 446 if item["record"].get("facets"): 447 for facet in item["record"].get("facets"): 448 for feature in facet.get("features"): 449 if feature.get("tag"): 450 tags.add(feature.get("tag")) 451 elif feature.get("uri"): 452 links.add(feature.get("uri")) 453 elif feature.get("did"): 454 mentions_did.add(feature.get("did")) 455 elif feature.get("number"): 456 has_poll = True 457 else: 458 unmapped_data.append({"loc": "record.facets.features", "obj": feature}) 459 if "features" not in facet: 460 unmapped_data.append({"loc": "record.facets", "obj": facet}) 461 462 # Embeds are in both the item and the record; so far these always contain same item 463 embeded_links = set() 464 embeded_images = set() 465 image_references = set() 466 quoted_link = None 467 quoted_user = None 468 quoted_ref = None 469 possible_embeds = [item.get("embed", {}), item["record"].get("embed", {})] 470 while possible_embeds: 471 embed = possible_embeds.pop(0) 472 if not embed: 473 continue 474 475 py_type = embed.pop("py_type") if "py_type" in embed else (embed.pop("$type") if "$type" in embed else None) 476 if py_type in ["app.bsky.embed.recordWithMedia#view", "app.bsky.embed.recordWithMedia"]: 477 # contains post plus additional media 478 for key, media_ob in embed.items(): 479 possible_embeds.append(media_ob) 480 481 elif "images" in embed: # py_type in ["app.bsky.embed.images#view", "app.bsky.embed.images", "app.bsky.embed.images#main"] 482 for img_ob in embed["images"]: 483 img_link = img_ob.get("fullsize", img_ob.get("thumb")) 484 if img_link: 485 embeded_images.add(img_link) 486 elif img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")): 487 # ob.get("image").get("ref").get("link") will have a reference that could be linked via API 488 # BUT ref has already been obtained in other embeds... 489 image_references.add(img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link"))) 490 else: 491 unmapped_data.append({"loc": "embed.images", "obj": img_ob}) 492 elif py_type in ["app.bsky.embed.video#view", "app.bsky.embed.video"]: 493 # Does not appear to be direct video links, just thumbnail (Bluesky API may be able to get more) 494 if embed.get("thumbnail"): 495 embeded_images.add(embed.get("thumbnail")) 496 elif embed.get("video", {}).get("ref", {}).get("link"): 497 image_references.add(embed.get("video", {}).get("ref", {}).get("link")) 498 else: 499 # No thumb for video 500 pass 501 elif "record" in embed: # py_type in ["app.bsky.embed.record#view", "app.bsky.embed.record"] 502 # Quoted post 503 # Note: these may also contain images that would be seen, but are not part of the original post 504 if embed["record"].get("author", embed["record"].get("creator")): 505 if "handle" not in embed["record"].get("author", embed["record"].get("creator")): 506 # User may not be able to see original post 507 if "app.bsky.feed.defs#blockedAuthor" == embed["record"].get("author", embed["record"].get("creator"))["py_type"]: 508 quoted_link = "VIEWER BLOCKED BY AUTHOR" 509 else: 510 # New unknown 511 unmapped_data.append({"loc": "embed.record.author", "obj": embed["record"].get("author", embed["record"].get("creator"))}) 512 else: 513 # author seems to be a quoted post while creator a quoted "list" 514 quoted_user = embed["record"].get("author", embed["record"].get("creator"))["handle"] 515 quoted_link = SearchBluesky.get_bsky_link(quoted_user, embed['record']['uri'].split('/')[-1]) 516 elif embed["record"].get("not_found"): 517 quoted_link = "DELETED" 518 # We do have the DID, but this information is not normally displayed 519 # quoted_user = embed["record"]['uri'].split("/")[2] 520 elif embed["record"].get("detached"): 521 quoted_link = "REMOVED BY AUTHOR" 522 else: 523 quoted_ref = embed["record"]['uri'] 524 elif "external" in embed: # py_type in ["app.bsky.embed.external#view", "app.bsky.embed.external"] 525 if embed["external"].get("uri"): 526 embeded_links.add(embed["external"].get("uri")) 527 if embed["external"].get("thumb"): 528 if isinstance(embed["external"]["thumb"], str): 529 embeded_images.add(embed["external"]["thumb"]) 530 else: 531 image_references.add(embed["external"]["thumb"].get("ref", {}).get("link", "")) 532 else: 533 # No thumb for link 534 pass 535 else: 536 unmapped_data.append({"loc": f"embed.{py_type}", 537 "obj": embed}) 538 539 # Replies allowed 540 # https://docs.bsky.app/docs/tutorials/thread-gates 541 # threadgate object does not appear to differentiate between the types of replies allowed 542 replies_allowed = True if not item["threadgate"] else False 543 544 # Labels (content moderation) 545 labels = set() if not item["labels"] else set([label.get("val") for label in item["labels"]]) 546 if item["record"].get("labels"): 547 labels = labels | set([label.get("val") for label in item["record"]["labels"].get("values",[])]) 548 549 # Language 550 languages = "N/A" if not item["record"].get("langs") else ",".join(item["record"].get("langs")) 551 552 # Missing references 553 if any([ref for ref in image_references if ref not in "".join(embeded_images)]): 554 unmapped_data.append({"loc": "missing_image_refs", "obj": [ref for ref in image_references if ref not in "".join(embeded_images)]}) 555 if quoted_ref: 556 if not quoted_link or (quoted_link not in ["DELETED", "REMOVED BY AUTHOR", "VIEWER BLOCKED BY AUTHOR"] and quoted_ref.split('/')[-1] not in quoted_link): 557 unmapped_data.append({"loc": "missing_quote_ref", "obj": quoted_ref}) 558 559 # Reference Posts (expanded to include handles during collection) 560 # None: handles may change; true DID from original object stored item["record"]["facets"]["features"] w/ "did" 561 # Mentions 562 mentions = [(mention.get("handle") if (mention.get("handle") and mention["handle"].lower() not in SearchBluesky.handle_lookup_error_messages) else mention.get("did")) for mention in item.get("4CAT_metadata", {}).get("mentions", [])] 563 # Reply to 564 replied_to_post = None 565 replied_to_user = None 566 if item["record"].get("reply"): 567 if item["4CAT_metadata"]["reply_to"] and item["4CAT_metadata"]["reply_to"].lower() not in SearchBluesky.handle_lookup_error_messages: 568 replied_to_user = item["4CAT_metadata"]["reply_to"] 569 else: 570 # Use DID, though this will not create a working link 571 replied_to_user = item["record"]["reply"]["parent"]["uri"].split("/")[2] 572 replied_to_post = SearchBluesky.get_bsky_link(replied_to_user, item["record"]["reply"]["parent"]["uri"].split("/")[-1]) 573 574 # These refer to slices of the text, but are also contained in the text or as an embed. If they are NOT also in the text and/or embed fields, then they are NOT displayed in bsky.app UI and thus only metadata 575 # if item["record"].get("entities"): 576 # unmapped_data.append({"loc": "record.entities", "obj": item["record"]["entities"]}) 577 578 # Author tags, not hashtags, not seen, very rarely used 579 # if item["record"].get("tags"): 580 # unmapped_data.append({"loc": "record.tags", "obj": item["record"].get("tags")}) 581 582 return MappedItem({ 583 "collected_at": datetime.fromtimestamp(item["4CAT_metadata"]["collected_at"]).isoformat(), 584 "query": item["4CAT_metadata"]["query"], 585 "rank": item["4CAT_metadata"]["rank"], 586 "id": item["uri"], 587 "thread_id": item["record"]["reply"]["root"]["uri"] if item["record"].get("reply") else item["uri"], 588 "created_at": created_at.isoformat(), 589 "author": item["author"]["handle"], 590 "author_id": item["author"]["did"], 591 "body": item["record"]["text"], 592 "link": link, 593 "tags": ",".join(tags), 594 "like_count": item["like_count"], 595 "quote_count": item["quote_count"], 596 "reply_count": item["reply_count"], 597 "repost_count": item["repost_count"], 598 "quoted_post": quoted_link if quoted_link else "", 599 "quoted_user": quoted_user if quoted_user else "", 600 "replied_to_post": replied_to_post if replied_to_post else "", 601 "replied_to_user": replied_to_user if replied_to_user else "", 602 "replies_allowed": replies_allowed, 603 "mentions": ",".join(mentions), 604 "links": ",".join(embeded_links | links), 605 "images": ",".join(embeded_images), 606 "labels": ",".join(labels), 607 "has_poll": has_poll, 608 "languages": languages, 609 610 "author_display_name": item["author"]["display_name"], 611 "author_profile": author_profile, 612 "author_avatar": item["author"]["avatar"], 613 "author_created_at": SearchBluesky.bsky_convert_datetime_string(item["author"]["created_at"], mode="iso_string", raise_error=False), 614 615 "timestamp": int(created_at.timestamp()), 616 }, message=f"Bluesky new mappings: {unmapped_data}")
Convert item object to 4CAT-ready data object
Parameters
- dict item: item to parse
Returns
4CAT-compatible item object
618 @staticmethod 619 def bsky_convert_datetime_string(datetime_string, mode="datetime", raise_error=True): 620 """ 621 Bluesky datetime string to datetime object. 622 623 Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string. 624 625 :param str datetime_string: The datetime string to convert 626 :param str mode: The mode to return the datetime object in [datetime, iso_string] 627 :param bool raise_error: Raise error if unable to parse else return datetime_string 628 :return datetime/str: The converted datetime object 629 """ 630 try: 631 datetime_object = parser.isoparse(datetime_string) 632 except ValueError as e: 633 if raise_error: 634 raise e 635 return datetime_string 636 637 if mode == "datetime": 638 return datetime_object 639 elif mode == "iso_string": 640 return datetime_object.isoformat()
Bluesky datetime string to datetime object.
Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string.
Parameters
- str datetime_string: The datetime string to convert
- str mode: The mode to return the datetime object in [datetime, iso_string]
- bool raise_error: Raise error if unable to parse else return datetime_string
Returns
The converted datetime object
643 @staticmethod 644 def get_bsky_link(handle, post_id): 645 """ 646 Get link to Bluesky post 647 """ 648 return f"https://bsky.app/profile/{handle}/post/{post_id}"
Get link to Bluesky post
650 @staticmethod 651 def bsky_get_handle_from_did(client, did): 652 """ 653 Get handle from DID 654 """ 655 tries = 0 656 while True: 657 try: 658 user_profile = client.app.bsky.actor.get_profile({"actor": did}) 659 if user_profile: 660 return user_profile.handle 661 else: 662 return None 663 except (NetworkError, InvokeTimeoutError): 664 # Network error; try again 665 tries += 1 666 time.sleep(1) 667 if tries > 3: 668 return None 669 continue 670 except BadRequestError as e: 671 if e.response.content.message: 672 return e.response.content.message 673 return None
Get handle from DID
675 @staticmethod 676 def bsky_login(username, password, session_id, session_directory): 677 """ 678 Login to Bluesky 679 680 :param str username: Username for Bluesky 681 :param str password: Password for Bluesky 682 :param str session_id: Session ID to use for login 683 :param Path session_directory: Directory to save the session file 684 :return Client: Client object with login credentials 685 """ 686 if not session_id: 687 session_id = SearchBluesky.create_session_id(username, password) 688 elif (not username or not password) and not session_id: 689 raise ValueError("Must provide both username and password or else session_id.") 690 691 session_path = session_directory.joinpath("bsky_" + session_id + ".session") 692 693 def on_session_change(event: SessionEvent, session: Session) -> None: 694 """ 695 Save session to file; atproto session change event handler should keep the session up to date 696 697 https://atproto.blue/en/latest/atproto_client/auth.html 698 """ 699 print('Session changed:', event, repr(session)) 700 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 701 print('Saving changed session') 702 with session_path.open("w") as session_file: 703 session_file.write(session.export()) 704 705 client = Client() 706 client.on_session_change(on_session_change) 707 if session_path.exists(): 708 with session_path.open() as session_file: 709 session_string = session_file.read() 710 try: 711 client.login(session_string=session_string) 712 except BadRequestError as e: 713 if e.response.content.message == 'Token has expired': 714 # Token has expired; try to refresh 715 if username and password: 716 client.login(login=username, password=password) 717 else: 718 raise ValueError("Session token has expired; please re-login with username and password.") 719 else: 720 # Were not able to log in via session string; login with username and password 721 client.login(login=username, password=password) 722 return client
Login to Bluesky
Parameters
- str username: Username for Bluesky
- str password: Password for Bluesky
- str session_id: Session ID to use for login
- Path session_directory: Directory to save the session file
Returns
Client object with login credentials
724 @staticmethod 725 def create_session_id(username, password): 726 """ 727 Generate a filename for the session file 728 729 This is a combination of username and password, but hashed 730 so that one cannot actually derive someone's information. 731 732 :param str username: Username for Bluesky 733 :param str password: Password for Bluesky 734 :return str: A hash value derived from the input 735 """ 736 hash_base = username.strip() + str(password).strip() 737 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
Generate a filename for the session file
This is a combination of username and password, but hashed so that one cannot actually derive someone's information.
Parameters
- str username: Username for Bluesky
- str password: Password for Bluesky
Returns
A hash value derived from the input
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.search.Search
- max_workers
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- search
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- iterate_proxied_requests
- push_proxied_request
- flush_proxied_requests
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- save_annotations
- map_item_method_available
- get_mapped_item
- is_filter
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor