Edit on GitHub

datasources.bsky.search_bsky

Collect Bluesky posts

  1"""
  2Collect Bluesky posts
  3"""
  4import hashlib
  5import time
  6from datetime import datetime
  7from pathlib import Path
  8
  9from dateutil import parser
 10
 11from atproto import Client, Session, SessionEvent
 12from atproto_client.exceptions import UnauthorizedError, BadRequestError, InvokeTimeoutError, RequestException, \
 13    ModelError, NetworkError
 14
 15from backend.lib.search import Search
 16from common.lib.exceptions import QueryParametersException, QueryNeedsExplicitConfirmationException, \
 17    ProcessorInterruptedException
 18from common.lib.helpers import timify_long
 19from common.lib.user_input import UserInput
 20from common.config_manager import config
 21from common.lib.item_mapping import MappedItem, MissingMappedField
 22
 23class SearchBluesky(Search):
 24    """
 25    Search for posts in Bluesky
 26    """
 27    type = "bsky-search"  # job ID
 28    category = "Search"  # category
 29    title = "Bluesky Search"  # title displayed in UI
 30    description = "Collects Bluesky posts via its API."  # description displayed in UI
 31    extension = "ndjson"  # extension of result file, used internally and in UI
 32    is_local = False  # Whether this datasource is locally scraped
 33    is_static = False  # Whether this datasource is still updated
 34
 35    config = {
 36        "bsky-search.max_results": {
 37            "type": UserInput.OPTION_TEXT,
 38            "help": "Maximum results per query",
 39            "coerce_type": int,
 40            "min": 0,
 41            "default": 50000,
 42            "tooltip": "Amount of results (e.g., posts) per query. '0' will allow unlimited."
 43        }
 44    }
 45
 46    handle_lookup_error_messages = ['account is deactivated', "profile not found", "account has been suspended"]
 47
 48    @classmethod
 49    def get_options(cls, parent_dataset=None, user=None):
 50        """
 51        Get processor options
 52
 53        Just updates the description of the entities field based on the
 54        configured max entities.
 55
 56        :param DataSet parent_dataset:  An object representing the dataset that
 57          the processor would be run on
 58        :param User user:  Flask user the options will be displayed for, in
 59          case they are requested for display in the 4CAT web interface. This can
 60          be used to show some options only to privileges users.
 61        """
 62        options = {
 63            "intro": {
 64                "type": UserInput.OPTION_INFO,
 65                "help": "Collects Bluesky posts via its API.\n\nYour login credentials will be sent to the 4CAT server "
 66                        "and stored there while data is fetched. After the dataset has been created your credentials "
 67                        "will be deleted from the server. \n[See tips and tricks on how to query Bluesky](https://bsky.social/about/blog/05-31-2024-search)."
 68            },
 69            "username": {
 70                "type": UserInput.OPTION_TEXT,
 71                "help": "Bluesky Username",
 72                "cache": True,
 73                "sensitive": True,
 74            },
 75            "password": {
 76                "type": UserInput.OPTION_TEXT,
 77                "help": "Bluesky Password",
 78                "cache": True, # tells the frontend to cache this value
 79                "sensitive": True, # tells the backend to delete this value after use
 80                "password": True, # tells the frontend this is a password type
 81            },
 82            "divider": {
 83                "type": UserInput.OPTION_DIVIDER
 84            },
 85            "query": {
 86                "type": UserInput.OPTION_TEXT_LARGE,
 87                "help": "Search Queries",
 88                "tooltip": "Separate with commas or line breaks."
 89            },
 90            "max_posts": {
 91                "type": UserInput.OPTION_TEXT,
 92                "help": "Max posts per query",
 93                "min": 1,
 94                "default": 100
 95            },
 96            "daterange": {
 97                "type": UserInput.OPTION_DATERANGE,
 98                "help": "Date range",
 99                "tooltip": "The date range for the search. No date range will search all posts."
100            },
101        }
102
103        # Update the max_posts setting from config
104        max_posts = int(config.get('bsky-search.max_results', 100, user=user))
105        if max_posts == 0:
106            # This is potentially madness
107            options["max_posts"]["tooltip"] = "Set to 0 to collect all posts."
108            options['max_posts']['min'] = 0
109        else:
110            options["max_posts"]["max"] = max_posts
111            options['max_posts']['default'] = options['max_posts']['default'] if options['max_posts']['default'] <= max_posts else max_posts
112
113        return options
114
115    @staticmethod
116    def validate_query(query, request, user):
117        """
118        Validate Bluesky query
119
120        :param dict query:  Query parameters, from client-side.
121        :param request:  Flask request
122        :param User user:  User object of user who has submitted the query
123        :return dict:  Safe query parameters
124        """
125        # no query 4 u
126        if not query.get("query", "").strip():
127            raise QueryParametersException("You must provide a search query.")
128
129        if not query.get("username", None) or not query.get("password", None) :
130            raise QueryParametersException("You need to provide valid Bluesky login credentials first.")
131
132        # Test login credentials
133        session_id = SearchBluesky.create_session_id(query["username"], query["password"])
134        try:
135            SearchBluesky.bsky_login(username=query["username"], password=query["password"], session_id=session_id)
136        except UnauthorizedError as e:
137            raise QueryParametersException("Invalid Bluesky login credentials.")
138        except RequestException as e:
139            if e.response.content.message == 'Rate Limit Exceeded':
140                lifted_at = datetime.fromtimestamp(int(e.response.headers["ratelimit-reset"]))
141                raise QueryParametersException(f"Bluesky rate limit exceeded. Try again after {lifted_at.strftime('%Y-%m-%d %H:%M:%S')}.")
142            else:
143                raise QueryParametersException(f"Bluesky login failed. {e.response.content.message}")
144
145        # sanitize query
146        sanitized_query = [q.strip() for q in query.get("query").replace("\n", ",").split(",") if q.strip()]
147
148        # the dates need to make sense as a range to search within
149        min_date, max_date = query.get("daterange")
150        if min_date and max_date and min_date > max_date:
151            raise QueryParametersException("The start date must be before the end date.")
152
153        # Only check this if not already confirmed by the frontend
154        posts_per_second = 55 # gathered from simply checking start/end times of logs
155        if not query.get("frontend-confirm"):
156            # Estimate is not returned; use max_posts as a rough estimate
157            max_posts = query.get("max_posts", 100)
158            expected_tweets = query.get("max_posts", 100) * len(sanitized_query)
159            # Warn if process may take more than ~1 hours
160            if expected_tweets > (posts_per_second * 3600):
161                expected_time = timify_long(expected_tweets / posts_per_second)
162                raise QueryNeedsExplicitConfirmationException(f"This query matches approximately {expected_tweets} tweets and may take {expected_time} to complete. Do you want to continue?")
163            elif max_posts == 0 and not min_date:
164                raise QueryNeedsExplicitConfirmationException(f"No maximum number of posts set! This query has no minimum date and thus may take a very, very long time to complete. Do you want to continue?")
165            elif max_posts == 0:
166                raise QueryNeedsExplicitConfirmationException(f"No maximum number of posts set! This query may take a long time to complete. Do you want to continue?")
167
168        return {
169            "max_posts": query.get("max_posts"),
170            "query": ",".join(sanitized_query),
171            "username": query.get("username"),
172            "password": query.get("password"),
173            "session_id": session_id,
174            "min_date": min_date,
175            "max_date": max_date,
176        }
177
178    def get_items(self, query):
179        """
180        Execute a query; get messages for given parameters
181
182        Basically a wrapper around execute_queries() to call it with asyncio.
183
184        :param dict query:  Query parameters, as part of the DataSet object
185        :return list:  Posts, sorted by thread and post ID, in ascending order
186        """
187        if not query.get("session_id") and (not query.get("username") or not query.get("password")):
188            return self.dataset.finish_with_error("Your Bluesky login credentials are no longer available in 4CAT; please re-create this datasource.")
189
190        session_id = SearchBluesky.create_session_id(query.get("username"), query.get("password")) if not query.get("session_id") else query["session_id"]
191        try:
192            client = SearchBluesky.bsky_login(username=query.get("username"), password=query.get("password"), session_id=session_id)
193        except (UnauthorizedError, RequestException, BadRequestError) as e:
194            self.dataset.log(f"Bluesky login failed: {e}")
195            return self.dataset.finish_with_error("Bluesky login failed; please re-create this datasource.")
196
197        self.dataset.update_status(f"Collecting posts from Bluesky as {client.me.handle}")
198
199        max_posts = query.get("max_posts", 100)
200        limit = 100 if (max_posts > 100 or max_posts == 0) else max_posts
201
202        # Handle reference mapping; user references use did instead of dynamic handle
203        did_to_handle = {}
204
205        query_parameters = {
206            "limit": limit,
207        }
208
209        # Add start and end dates if provided
210        if self.parameters.get("min_date"):
211            query_parameters['since'] = datetime.fromtimestamp(self.parameters.get("min_date")).strftime('%Y-%m-%dT%H:%M:%SZ')
212        if self.parameters.get("max_date"):
213            query_parameters['until'] = datetime.fromtimestamp(self.parameters.get("max_date")).strftime('%Y-%m-%dT%H:%M:%SZ')
214
215        queries = query.get("query").split(",")
216        num_queries = len(queries)
217        total_posts = 0
218        i = 0
219        last_query = None
220        last_date = None
221        while queries:
222            query = queries.pop(0)
223            if query == last_query:
224                # Check if there are continued posts from the last query
225                query_parameters['until'] = last_date.strftime('%Y-%m-%dT%H:%M:%SZ')
226                self.dataset.log(f"Continuing query ({i} of {num_queries}): {query} from {last_date.strftime('%Y-%m-%dT%H:%M:%SZ')}")
227            else:
228                # New query
229                query_post_ids = set()
230                i += 1
231                rank = 0
232                last_query = query
233                last_date = None
234                self.dataset.update_status(f"Collecting query ({i} of {num_queries}): {query}")
235                query_requests = 0
236
237            query_parameters["q"] = query
238            cursor = None  # Start with no cursor (first page)
239            search_for_invalid_post = False
240            invalid_post_counter = 0
241            while True:
242                if self.interrupted:
243                    raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API")
244                # Query posts, including pagination (cursor for next page)
245                tries = 0
246                response = None
247                while tries < 3:
248                    query_parameters["cursor"] = cursor
249                    try:
250                        response = client.app.bsky.feed.search_posts(params=query_parameters)
251                        break
252                    except ModelError as e:
253                        # Post validation error; one post is unable to be read
254                        # Pattern: some invalid post raises error, we switch from higher limit (usually 100) to 1 in
255                        # order to collect post by post, invalid post is identified again, we switch back to higher
256                        # limit and continue as normal, at the "end" of a cursor/query life (~10k posts) a NetworkError
257                        # is raised with detail refering to a server error 502 InternalServerError, we catch that and
258                        # add the query back to the queue with a new "until" date to continue the query
259                        # https://github.com/bluesky-social/atproto/issues/3446
260                        if not search_for_invalid_post:
261                            # New invalid post, search and skip
262                            self.dataset.log(f"Invalid post detected; searching post by post: {e}")
263                            search_for_invalid_post = True
264                            # Currently we must search post by post to find the invalid post
265                            query_parameters["limit"] = 1
266                        else:
267                            # Found invalid post, skip, reset counters
268                            self.dataset.log(
269                                f"Invalid post identified; skipping and continue with query as normal: {e}")
270                            search_for_invalid_post = False
271                            # Reset limit to normal
272                            query_parameters["limit"] = limit
273                            invalid_post_counter = 0
274                            cursor = str(int(cursor) + 1) if cursor else None
275                        # Re-query with new cursor & limit
276                        continue
277
278                    except InvokeTimeoutError as e:
279                        # Timeout error, but can occur for odd queries with no results
280                        self.dataset.log(f"Bluesky request error for query {query}: {e}")
281                        time.sleep(1)
282                        tries += 2
283                        continue
284                    except NetworkError as e:
285                        # 502 InternalServerError: occurs if switch limits in a "set" (i.e. the vague 10k posts cursor limit), I seem to get this error around the 10k mark instead of just a missing cursor as normal
286                        self.dataset.log(f"Bluesky network error for query {query}; retrying: {e}")
287                        time.sleep(1 + (tries * 10))
288                        queries.insert(0, query)
289                        break
290
291                if not response:
292                    # Expected from NetworkError, but the query will have been added back to the queue
293                    # If not, then there was a problem with the query
294                    if len(queries) == 0:
295                        self.dataset.update_status(f"Error collecting posts from Bluesky; see log for details", is_final=True)
296                    if query not in queries:
297                        # Query was not added back; there was an unexpected issue with the query itself
298                        self.dataset.update_status(f"Error continuing {query} from Bluesky (see log for details); continuing to next query")
299                    break
300
301                query_requests += 1
302                items = response['posts'] if hasattr(response, 'posts') else []
303
304                if search_for_invalid_post:
305                    invalid_post_counter += 1
306                    if invalid_post_counter >= 100:
307                        #  Max limit is 100; this should not occur, but we do not want to continue searching post by post indefinitely
308                        self.dataset.log(f"Unable to identify invalid post; discontinuing search")
309                        query_parameters["limit"] = limit
310                        search_for_invalid_post = False
311                        invalid_post_counter = 0
312
313                    if not items:
314                        # Sometimes no post is returned, but there still may be posts following
315                        self.dataset.log(f"Query {query} w/ params {query_parameters} returned no posts: {response}")
316                        # TODO: this is odd; no information is returned as to why that one item is not returned and no error is raised
317                        cursor = str(int(cursor) + 1) if cursor else None
318                        continue
319
320                new_posts = 0
321                # Handle the posts
322                for item in items:
323                    if 0 < max_posts <= rank:
324                        break
325
326                    if self.interrupted:
327                        raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API")
328
329                    post = item.model_dump()
330                    post_id = post["uri"]
331                    # Queries use the indexed_at date for time-based pagination (as opposed to created_at); used to continue query if needed
332                    last_date = SearchBluesky.bsky_convert_datetime_string(post.get("indexed_at"))
333                    if post_id in query_post_ids:
334                        # Skip duplicate posts
335                        continue
336
337                    new_posts += 1
338                    query_post_ids.add(post_id)
339
340                    # Add user handles from references
341                    did_to_handle[post["author"]["did"]] = post["author"]["handle"]
342                    # Mentions
343                    mentions = []
344                    if post["record"].get("facets"):
345                        for facet in post["record"]["facets"]:
346                            for feature in facet.get("features", {}):
347                                if feature.get("did"):
348                                    if feature["did"] in did_to_handle:
349                                        mentions.append({"did": feature["did"], "handle": did_to_handle[feature["did"]]})
350                                    else:
351                                        handle = SearchBluesky.bsky_get_handle_from_did(client, feature["did"])
352                                        if handle:
353                                            if handle.lower() in self.handle_lookup_error_messages:
354                                                self.dataset.log(f"Bluesky: user ({feature['did']}) {handle}")
355                                            mentions.append({"did": feature["did"], "handle": handle})
356                                            did_to_handle[feature["did"]] = handle
357                                        else:
358                                            mentions.append({"did": feature["did"], "handle": None})
359                                            self.dataset.log(f"Bluesky: could not lookup the handle for {feature['did']}")
360                    # Reply to
361                    reply_to_handle = None
362                    if post["record"].get("reply"):
363                        reply_to_did = post["record"]["reply"]["parent"]["uri"].split("/")[2]
364                        if reply_to_did in did_to_handle:
365                            reply_to_handle = did_to_handle[reply_to_did]
366                        else:
367                            handle = SearchBluesky.bsky_get_handle_from_did(client, reply_to_did)
368                            if handle:
369                                if handle.lower() in self.handle_lookup_error_messages:
370                                    self.dataset.log(f"Bluesky: user ({reply_to_did}) {handle}")
371                                reply_to_handle = handle
372                                did_to_handle[reply_to_did] = handle
373                            else:
374                                self.dataset.log(f"Bluesky: could not find handle for {reply_to_did}")
375
376
377                    post.update({"4CAT_metadata": {
378                        "collected_at": datetime.now().timestamp(),
379                        "query": query,
380                        "rank": rank,
381                        "mentions": mentions,
382                        "reply_to": reply_to_handle if reply_to_handle else None,
383                    }})
384                    rank += 1
385                    yield post
386                    total_posts += 1
387
388                # Check if there is a cursor for the next page
389                cursor = response['cursor']
390                if max_posts != 0:
391                    self.dataset.update_progress(total_posts / (max_posts * num_queries))
392
393                if 0 < max_posts <= rank:
394                    self.dataset.update_status(
395                        f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}")
396                    break
397
398                if not cursor:
399                    if new_posts:
400                        # Bluesky API seems to stop around 10000 posts and not return a cursor
401                        # Re-query with the same query to get the next set of posts using last_date (set above)
402                        self.dataset.log(f"Query {query}: {query_requests} requests")
403                        queries.insert(0, query)
404                    else:
405                        # No new posts; if we have not hit the max_posts, but no new posts are being returned, then we are done
406                        self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned")
407
408                    if rank:
409                        self.dataset.update_status(f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}")
410                    break  # No more pages, stop the loop
411                elif not items:
412                    self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned")
413                    break
414
415    @staticmethod
416    def map_item(item):
417        """
418        Convert item object to 4CAT-ready data object
419
420        :param dict item:  item to parse
421        :return dict:  4CAT-compatible item object
422        """
423        unmapped_data = []
424
425        # Add link to post; this is a Bluesky-specific URL and may not always be accurate
426        link = SearchBluesky.get_bsky_link(item['author']['handle'], item['uri'].split('/')[-1])
427        author_profile = f"https://bsky.app/profile/{item['author']['handle']}"
428
429        created_at = SearchBluesky.bsky_convert_datetime_string(item["record"].get("created_at",item["record"].get("createdAt")))
430
431        # Tags
432        tags = set()
433        links = set()
434        mentions_did = set()
435        has_poll = False
436        if item["record"].get("facets"):
437            for facet in item["record"].get("facets"):
438                for feature in facet.get("features"):
439                    if feature.get("tag"):
440                        tags.add(feature.get("tag"))
441                    elif feature.get("uri"):
442                        links.add(feature.get("uri"))
443                    elif feature.get("did"):
444                        mentions_did.add(feature.get("did"))
445                    elif feature.get("number"):
446                        has_poll = True
447                    else:
448                        unmapped_data.append({"loc": "record.facets.features", "obj": feature})
449                if "features" not in facet:
450                    unmapped_data.append({"loc": "record.facets", "obj": facet})
451
452        # Embeds are in both the item and the record; so far these always contain same item
453        embeded_links = set()
454        embeded_images = set()
455        image_references = set()
456        quoted_link = None
457        quoted_user = None
458        quoted_ref = None
459        possible_embeds = [item.get("embed", {}), item["record"].get("embed", {})]
460        while possible_embeds:
461            embed = possible_embeds.pop(0)
462            if not embed:
463                continue
464
465            py_type = embed.pop("py_type") if "py_type" in embed else (embed.pop("$type") if "$type" in embed else None)
466            if py_type in ["app.bsky.embed.recordWithMedia#view", "app.bsky.embed.recordWithMedia"]:
467                # contains post plus additional media
468                for key, media_ob in embed.items():
469                    possible_embeds.append(media_ob)
470
471            elif "images" in embed: # py_type in ["app.bsky.embed.images#view", "app.bsky.embed.images", "app.bsky.embed.images#main"]
472                for img_ob in embed["images"]:
473                    img_link = img_ob.get("fullsize", img_ob.get("thumb"))
474                    if img_link:
475                        embeded_images.add(img_link)
476                    elif img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")):
477                        # ob.get("image").get("ref").get("link") will have a reference that could be linked via API
478                        # BUT ref has already been obtained in other embeds...
479                        image_references.add(img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")))
480                    else:
481                        unmapped_data.append({"loc": "embed.images", "obj": img_ob})
482            elif py_type in ["app.bsky.embed.video#view", "app.bsky.embed.video"]:
483                # Does not appear to be direct video links, just thumbnail (Bluesky API may be able to get more)
484                if embed.get("thumbnail"):
485                    embeded_images.add(embed.get("thumbnail"))
486                elif embed.get("video", {}).get("ref", {}).get("link"):
487                    image_references.add(embed.get("video", {}).get("ref", {}).get("link"))
488                else:
489                    # No thumb for video
490                    pass
491            elif "record" in embed: # py_type in ["app.bsky.embed.record#view", "app.bsky.embed.record"]
492                # Quoted post
493                # Note: these may also contain images that would be seen, but are not part of the original post
494                if embed["record"].get("author", embed["record"].get("creator")):
495                    if "handle" not in embed["record"].get("author", embed["record"].get("creator")):
496                        # User may not be able to see original post
497                        if "app.bsky.feed.defs#blockedAuthor" == embed["record"].get("author", embed["record"].get("creator"))["py_type"]:
498                            quoted_link = "VIEWER BLOCKED BY AUTHOR"
499                        else:
500                            # New unknown
501                            unmapped_data.append({"loc": "embed.record.author", "obj": embed["record"].get("author", embed["record"].get("creator"))})
502                    else:
503                        # author seems to be a quoted post while creator a quoted "list"
504                        quoted_user = embed["record"].get("author", embed["record"].get("creator"))["handle"]
505                        quoted_link = SearchBluesky.get_bsky_link(quoted_user, embed['record']['uri'].split('/')[-1])
506                elif embed["record"].get("not_found"):
507                    quoted_link = "DELETED"
508                    # We do have the DID, but this information is not normally displayed
509                    # quoted_user = embed["record"]['uri'].split("/")[2]
510                elif embed["record"].get("detached"):
511                    quoted_link = "REMOVED BY AUTHOR"
512                else:
513                    quoted_ref = embed["record"]['uri']
514            elif "external" in embed: # py_type in ["app.bsky.embed.external#view", "app.bsky.embed.external"]
515                if embed["external"].get("uri"):
516                    embeded_links.add(embed["external"].get("uri"))
517                if embed["external"].get("thumb"):
518                    if isinstance(embed["external"]["thumb"], str):
519                        embeded_images.add(embed["external"]["thumb"])
520                    else:
521                        image_references.add(embed["external"]["thumb"].get("ref", {}).get("link", ""))
522                else:
523                    # No thumb for link
524                    pass
525            else:
526                unmapped_data.append({"loc": f"embed.{py_type}",
527                                      "obj": embed})
528
529        # Replies allowed
530        # https://docs.bsky.app/docs/tutorials/thread-gates
531        # threadgate object does not appear to differentiate between the types of replies allowed
532        replies_allowed = True if not item["threadgate"] else False
533
534        # Labels (content moderation)
535        labels = set() if not item["labels"] else set([label.get("val") for label in item["labels"]])
536        if item["record"].get("labels"):
537            labels = labels | set([label.get("val") for label in item["record"]["labels"].get("values",[])])
538
539        # Language
540        languages = "N/A" if not item["record"].get("langs") else ",".join(item["record"].get("langs"))
541
542        # Missing references
543        if any([ref for ref in image_references if ref not in "".join(embeded_images)]):
544            unmapped_data.append({"loc": "missing_image_refs", "obj": [ref for ref in image_references if ref not in "".join(embeded_images)]})
545        if quoted_ref:
546            if not quoted_link or (quoted_link not in ["DELETED", "REMOVED BY AUTHOR", "VIEWER BLOCKED BY AUTHOR"] and quoted_ref.split('/')[-1] not in quoted_link):
547                unmapped_data.append({"loc": "missing_quote_ref", "obj": quoted_ref})
548
549        # Reference Posts (expanded to include handles during collection)
550        # None: handles may change; true DID from original object stored item["record"]["facets"]["features"] w/ "did"
551        # Mentions
552        mentions = [(mention.get("handle") if (mention.get("handle") and mention["handle"].lower() not in SearchBluesky.handle_lookup_error_messages) else mention.get("did")) for mention in item.get("4CAT_metadata", {}).get("mentions", [])]
553        # Reply to
554        replied_to_post = None
555        replied_to_user = None
556        if item["record"].get("reply"):
557            if item["4CAT_metadata"]["reply_to"] and item["4CAT_metadata"]["reply_to"].lower() not in SearchBluesky.handle_lookup_error_messages:
558                replied_to_user = item["4CAT_metadata"]["reply_to"]
559            else:
560                # Use DID, though this will not create a working link
561                replied_to_user = item["record"]["reply"]["parent"]["uri"].split("/")[2]
562            replied_to_post = SearchBluesky.get_bsky_link(replied_to_user, item["record"]["reply"]["parent"]["uri"].split("/")[-1])
563
564        # These refer to slices of the text, but are also contained in the text or as an embed. If they are NOT also in the text and/or embed fields, then they are NOT displayed in bsky.app UI and thus only metadata
565        # if item["record"].get("entities"):
566        #     unmapped_data.append({"loc": "record.entities", "obj": item["record"]["entities"]})
567
568        # Author tags, not hashtags, not seen, very rarely used
569        # if item["record"].get("tags"):
570        #     unmapped_data.append({"loc": "record.tags", "obj": item["record"].get("tags")})
571
572        if unmapped_data:
573            # TODO: Use MappedItem message; currently it is not called...
574            config.with_db()
575            config.db.log.warning(f"Bluesky new mappings ({item['uri']}): {unmapped_data}")
576
577        return MappedItem({
578            "collected_at": datetime.fromtimestamp(item["4CAT_metadata"]["collected_at"]).isoformat(),
579            "query": item["4CAT_metadata"]["query"],
580            "rank": item["4CAT_metadata"]["rank"],
581            "id": item["uri"],
582            "thread_id": item["record"]["reply"]["root"]["uri"] if item["record"].get("reply") else item["uri"],
583            "created_at": created_at.isoformat(),
584            "author": item["author"]["handle"],
585            "author_id": item["author"]["did"],
586            "body": item["record"]["text"],
587            "link": link,
588            "tags": ",".join(tags),
589            "like_count": item["like_count"],
590            "quote_count": item["quote_count"],
591            "reply_count": item["reply_count"],
592            "repost_count": item["repost_count"],
593            "quoted_post": quoted_link if quoted_link else "",
594            "quoted_user": quoted_user if quoted_user else "",
595            "replied_to_post": replied_to_post if replied_to_post else "",
596            "replied_to_user": replied_to_user if replied_to_user else "",
597            "replies_allowed": replies_allowed,
598            "mentions": ",".join(mentions),
599            "links": ",".join(embeded_links | links),
600            "images": ",".join(embeded_images),
601            "labels": ",".join(labels),
602            "has_poll": has_poll,
603            "languages": languages,
604
605            "author_display_name": item["author"]["display_name"],
606            "author_profile": author_profile,
607            "author_avatar": item["author"]["avatar"],
608            "author_created_at": SearchBluesky.bsky_convert_datetime_string(item["author"]["created_at"], mode="iso_string", raise_error=False),
609
610            "timestamp": created_at.timestamp(),
611        }, message=f"Bluesky new mappings: {unmapped_data}")
612
613    @staticmethod
614    def bsky_convert_datetime_string(datetime_string, mode="datetime", raise_error=True):
615        """
616        Bluesky datetime string to datetime object.
617
618        Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string.
619
620        :param str datetime_string:  The datetime string to convert
621        :param str mode:  The mode to return the datetime object in [datetime, iso_string]
622        :param bool raise_error:  Raise error if unable to parse else return datetime_string
623        :return datetime/str:  The converted datetime object
624        """
625        try:
626            datetime_object = parser.isoparse(datetime_string)
627        except ValueError as e:
628            if raise_error:
629                raise e
630            return datetime_string
631        
632        if mode == "datetime":
633            return datetime_object
634        elif mode == "iso_string":
635            return datetime_object.isoformat()
636
637
638    @staticmethod
639    def get_bsky_link(handle, post_id):
640        """
641        Get link to Bluesky post
642        """
643        return f"https://bsky.app/profile/{handle}/post/{post_id}"
644
645    @staticmethod
646    def bsky_get_handle_from_did(client, did):
647        """
648        Get handle from DID
649        """
650        tries = 0
651        while True:
652            try:
653                user_profile = client.app.bsky.actor.get_profile({"actor": did})
654                if user_profile:
655                    return user_profile.handle
656                else:
657                    return None
658            except InvokeTimeoutError as e:
659                # Network error; try again
660                tries += 1
661                time.sleep(1)
662                if tries > 3:
663                    return None
664                continue
665            except BadRequestError as e:
666                if e.response.content.message:
667                    return e.response.content.message
668                return None
669
670    @staticmethod
671    def bsky_login(username, password, session_id):
672        """
673        Login to Bluesky
674
675        :param str username:  Username for Bluesky
676        :param str password:  Password for Bluesky
677        :param str session_id:  Session ID to use for login
678        :return Client:  Client object with login credentials
679        """
680        if not session_id:
681            session_id = SearchBluesky.create_session_id(username, password)
682        elif (not username or not password) and not session_id:
683            raise ValueError("Must provide both username and password or else session_id.")
684
685        session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), "bsky_" + session_id + ".session")
686
687        def on_session_change(event: SessionEvent, session: Session) -> None:
688            """
689            Save session to file; atproto session change event handler should keep the session up to date
690
691            https://atproto.blue/en/latest/atproto_client/auth.html
692            """
693            print('Session changed:', event, repr(session))
694            if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
695                print('Saving changed session')
696                with session_path.open("w") as session_file:
697                    session_file.write(session.export())
698
699        client = Client()
700        client.on_session_change(on_session_change)
701        if session_path.exists():
702            with session_path.open() as session_file:
703                session_string = session_file.read()
704                client.login(session_string=session_string)
705        else:
706            # Were not able to log in via session string; login with username and password
707            client.login(login=username, password=password)
708        return client
709
710    @staticmethod
711    def create_session_id(username, password):
712        """
713        Generate a filename for the session file
714
715        This is a combination of username and password, but hashed
716        so that one cannot actually derive someone's information.
717
718        :param str username:  Username for Bluesky
719        :param str password:  Password for Bluesky
720        :return str: A hash value derived from the input
721        """
722        hash_base = username.strip() + str(password).strip()
723        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
class SearchBluesky(backend.lib.search.Search):
 24class SearchBluesky(Search):
 25    """
 26    Search for posts in Bluesky
 27    """
 28    type = "bsky-search"  # job ID
 29    category = "Search"  # category
 30    title = "Bluesky Search"  # title displayed in UI
 31    description = "Collects Bluesky posts via its API."  # description displayed in UI
 32    extension = "ndjson"  # extension of result file, used internally and in UI
 33    is_local = False  # Whether this datasource is locally scraped
 34    is_static = False  # Whether this datasource is still updated
 35
 36    config = {
 37        "bsky-search.max_results": {
 38            "type": UserInput.OPTION_TEXT,
 39            "help": "Maximum results per query",
 40            "coerce_type": int,
 41            "min": 0,
 42            "default": 50000,
 43            "tooltip": "Amount of results (e.g., posts) per query. '0' will allow unlimited."
 44        }
 45    }
 46
 47    handle_lookup_error_messages = ['account is deactivated', "profile not found", "account has been suspended"]
 48
 49    @classmethod
 50    def get_options(cls, parent_dataset=None, user=None):
 51        """
 52        Get processor options
 53
 54        Just updates the description of the entities field based on the
 55        configured max entities.
 56
 57        :param DataSet parent_dataset:  An object representing the dataset that
 58          the processor would be run on
 59        :param User user:  Flask user the options will be displayed for, in
 60          case they are requested for display in the 4CAT web interface. This can
 61          be used to show some options only to privileges users.
 62        """
 63        options = {
 64            "intro": {
 65                "type": UserInput.OPTION_INFO,
 66                "help": "Collects Bluesky posts via its API.\n\nYour login credentials will be sent to the 4CAT server "
 67                        "and stored there while data is fetched. After the dataset has been created your credentials "
 68                        "will be deleted from the server. \n[See tips and tricks on how to query Bluesky](https://bsky.social/about/blog/05-31-2024-search)."
 69            },
 70            "username": {
 71                "type": UserInput.OPTION_TEXT,
 72                "help": "Bluesky Username",
 73                "cache": True,
 74                "sensitive": True,
 75            },
 76            "password": {
 77                "type": UserInput.OPTION_TEXT,
 78                "help": "Bluesky Password",
 79                "cache": True, # tells the frontend to cache this value
 80                "sensitive": True, # tells the backend to delete this value after use
 81                "password": True, # tells the frontend this is a password type
 82            },
 83            "divider": {
 84                "type": UserInput.OPTION_DIVIDER
 85            },
 86            "query": {
 87                "type": UserInput.OPTION_TEXT_LARGE,
 88                "help": "Search Queries",
 89                "tooltip": "Separate with commas or line breaks."
 90            },
 91            "max_posts": {
 92                "type": UserInput.OPTION_TEXT,
 93                "help": "Max posts per query",
 94                "min": 1,
 95                "default": 100
 96            },
 97            "daterange": {
 98                "type": UserInput.OPTION_DATERANGE,
 99                "help": "Date range",
100                "tooltip": "The date range for the search. No date range will search all posts."
101            },
102        }
103
104        # Update the max_posts setting from config
105        max_posts = int(config.get('bsky-search.max_results', 100, user=user))
106        if max_posts == 0:
107            # This is potentially madness
108            options["max_posts"]["tooltip"] = "Set to 0 to collect all posts."
109            options['max_posts']['min'] = 0
110        else:
111            options["max_posts"]["max"] = max_posts
112            options['max_posts']['default'] = options['max_posts']['default'] if options['max_posts']['default'] <= max_posts else max_posts
113
114        return options
115
116    @staticmethod
117    def validate_query(query, request, user):
118        """
119        Validate Bluesky query
120
121        :param dict query:  Query parameters, from client-side.
122        :param request:  Flask request
123        :param User user:  User object of user who has submitted the query
124        :return dict:  Safe query parameters
125        """
126        # no query 4 u
127        if not query.get("query", "").strip():
128            raise QueryParametersException("You must provide a search query.")
129
130        if not query.get("username", None) or not query.get("password", None) :
131            raise QueryParametersException("You need to provide valid Bluesky login credentials first.")
132
133        # Test login credentials
134        session_id = SearchBluesky.create_session_id(query["username"], query["password"])
135        try:
136            SearchBluesky.bsky_login(username=query["username"], password=query["password"], session_id=session_id)
137        except UnauthorizedError as e:
138            raise QueryParametersException("Invalid Bluesky login credentials.")
139        except RequestException as e:
140            if e.response.content.message == 'Rate Limit Exceeded':
141                lifted_at = datetime.fromtimestamp(int(e.response.headers["ratelimit-reset"]))
142                raise QueryParametersException(f"Bluesky rate limit exceeded. Try again after {lifted_at.strftime('%Y-%m-%d %H:%M:%S')}.")
143            else:
144                raise QueryParametersException(f"Bluesky login failed. {e.response.content.message}")
145
146        # sanitize query
147        sanitized_query = [q.strip() for q in query.get("query").replace("\n", ",").split(",") if q.strip()]
148
149        # the dates need to make sense as a range to search within
150        min_date, max_date = query.get("daterange")
151        if min_date and max_date and min_date > max_date:
152            raise QueryParametersException("The start date must be before the end date.")
153
154        # Only check this if not already confirmed by the frontend
155        posts_per_second = 55 # gathered from simply checking start/end times of logs
156        if not query.get("frontend-confirm"):
157            # Estimate is not returned; use max_posts as a rough estimate
158            max_posts = query.get("max_posts", 100)
159            expected_tweets = query.get("max_posts", 100) * len(sanitized_query)
160            # Warn if process may take more than ~1 hours
161            if expected_tweets > (posts_per_second * 3600):
162                expected_time = timify_long(expected_tweets / posts_per_second)
163                raise QueryNeedsExplicitConfirmationException(f"This query matches approximately {expected_tweets} tweets and may take {expected_time} to complete. Do you want to continue?")
164            elif max_posts == 0 and not min_date:
165                raise QueryNeedsExplicitConfirmationException(f"No maximum number of posts set! This query has no minimum date and thus may take a very, very long time to complete. Do you want to continue?")
166            elif max_posts == 0:
167                raise QueryNeedsExplicitConfirmationException(f"No maximum number of posts set! This query may take a long time to complete. Do you want to continue?")
168
169        return {
170            "max_posts": query.get("max_posts"),
171            "query": ",".join(sanitized_query),
172            "username": query.get("username"),
173            "password": query.get("password"),
174            "session_id": session_id,
175            "min_date": min_date,
176            "max_date": max_date,
177        }
178
179    def get_items(self, query):
180        """
181        Execute a query; get messages for given parameters
182
183        Basically a wrapper around execute_queries() to call it with asyncio.
184
185        :param dict query:  Query parameters, as part of the DataSet object
186        :return list:  Posts, sorted by thread and post ID, in ascending order
187        """
188        if not query.get("session_id") and (not query.get("username") or not query.get("password")):
189            return self.dataset.finish_with_error("Your Bluesky login credentials are no longer available in 4CAT; please re-create this datasource.")
190
191        session_id = SearchBluesky.create_session_id(query.get("username"), query.get("password")) if not query.get("session_id") else query["session_id"]
192        try:
193            client = SearchBluesky.bsky_login(username=query.get("username"), password=query.get("password"), session_id=session_id)
194        except (UnauthorizedError, RequestException, BadRequestError) as e:
195            self.dataset.log(f"Bluesky login failed: {e}")
196            return self.dataset.finish_with_error("Bluesky login failed; please re-create this datasource.")
197
198        self.dataset.update_status(f"Collecting posts from Bluesky as {client.me.handle}")
199
200        max_posts = query.get("max_posts", 100)
201        limit = 100 if (max_posts > 100 or max_posts == 0) else max_posts
202
203        # Handle reference mapping; user references use did instead of dynamic handle
204        did_to_handle = {}
205
206        query_parameters = {
207            "limit": limit,
208        }
209
210        # Add start and end dates if provided
211        if self.parameters.get("min_date"):
212            query_parameters['since'] = datetime.fromtimestamp(self.parameters.get("min_date")).strftime('%Y-%m-%dT%H:%M:%SZ')
213        if self.parameters.get("max_date"):
214            query_parameters['until'] = datetime.fromtimestamp(self.parameters.get("max_date")).strftime('%Y-%m-%dT%H:%M:%SZ')
215
216        queries = query.get("query").split(",")
217        num_queries = len(queries)
218        total_posts = 0
219        i = 0
220        last_query = None
221        last_date = None
222        while queries:
223            query = queries.pop(0)
224            if query == last_query:
225                # Check if there are continued posts from the last query
226                query_parameters['until'] = last_date.strftime('%Y-%m-%dT%H:%M:%SZ')
227                self.dataset.log(f"Continuing query ({i} of {num_queries}): {query} from {last_date.strftime('%Y-%m-%dT%H:%M:%SZ')}")
228            else:
229                # New query
230                query_post_ids = set()
231                i += 1
232                rank = 0
233                last_query = query
234                last_date = None
235                self.dataset.update_status(f"Collecting query ({i} of {num_queries}): {query}")
236                query_requests = 0
237
238            query_parameters["q"] = query
239            cursor = None  # Start with no cursor (first page)
240            search_for_invalid_post = False
241            invalid_post_counter = 0
242            while True:
243                if self.interrupted:
244                    raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API")
245                # Query posts, including pagination (cursor for next page)
246                tries = 0
247                response = None
248                while tries < 3:
249                    query_parameters["cursor"] = cursor
250                    try:
251                        response = client.app.bsky.feed.search_posts(params=query_parameters)
252                        break
253                    except ModelError as e:
254                        # Post validation error; one post is unable to be read
255                        # Pattern: some invalid post raises error, we switch from higher limit (usually 100) to 1 in
256                        # order to collect post by post, invalid post is identified again, we switch back to higher
257                        # limit and continue as normal, at the "end" of a cursor/query life (~10k posts) a NetworkError
258                        # is raised with detail refering to a server error 502 InternalServerError, we catch that and
259                        # add the query back to the queue with a new "until" date to continue the query
260                        # https://github.com/bluesky-social/atproto/issues/3446
261                        if not search_for_invalid_post:
262                            # New invalid post, search and skip
263                            self.dataset.log(f"Invalid post detected; searching post by post: {e}")
264                            search_for_invalid_post = True
265                            # Currently we must search post by post to find the invalid post
266                            query_parameters["limit"] = 1
267                        else:
268                            # Found invalid post, skip, reset counters
269                            self.dataset.log(
270                                f"Invalid post identified; skipping and continue with query as normal: {e}")
271                            search_for_invalid_post = False
272                            # Reset limit to normal
273                            query_parameters["limit"] = limit
274                            invalid_post_counter = 0
275                            cursor = str(int(cursor) + 1) if cursor else None
276                        # Re-query with new cursor & limit
277                        continue
278
279                    except InvokeTimeoutError as e:
280                        # Timeout error, but can occur for odd queries with no results
281                        self.dataset.log(f"Bluesky request error for query {query}: {e}")
282                        time.sleep(1)
283                        tries += 2
284                        continue
285                    except NetworkError as e:
286                        # 502 InternalServerError: occurs if switch limits in a "set" (i.e. the vague 10k posts cursor limit), I seem to get this error around the 10k mark instead of just a missing cursor as normal
287                        self.dataset.log(f"Bluesky network error for query {query}; retrying: {e}")
288                        time.sleep(1 + (tries * 10))
289                        queries.insert(0, query)
290                        break
291
292                if not response:
293                    # Expected from NetworkError, but the query will have been added back to the queue
294                    # If not, then there was a problem with the query
295                    if len(queries) == 0:
296                        self.dataset.update_status(f"Error collecting posts from Bluesky; see log for details", is_final=True)
297                    if query not in queries:
298                        # Query was not added back; there was an unexpected issue with the query itself
299                        self.dataset.update_status(f"Error continuing {query} from Bluesky (see log for details); continuing to next query")
300                    break
301
302                query_requests += 1
303                items = response['posts'] if hasattr(response, 'posts') else []
304
305                if search_for_invalid_post:
306                    invalid_post_counter += 1
307                    if invalid_post_counter >= 100:
308                        #  Max limit is 100; this should not occur, but we do not want to continue searching post by post indefinitely
309                        self.dataset.log(f"Unable to identify invalid post; discontinuing search")
310                        query_parameters["limit"] = limit
311                        search_for_invalid_post = False
312                        invalid_post_counter = 0
313
314                    if not items:
315                        # Sometimes no post is returned, but there still may be posts following
316                        self.dataset.log(f"Query {query} w/ params {query_parameters} returned no posts: {response}")
317                        # TODO: this is odd; no information is returned as to why that one item is not returned and no error is raised
318                        cursor = str(int(cursor) + 1) if cursor else None
319                        continue
320
321                new_posts = 0
322                # Handle the posts
323                for item in items:
324                    if 0 < max_posts <= rank:
325                        break
326
327                    if self.interrupted:
328                        raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API")
329
330                    post = item.model_dump()
331                    post_id = post["uri"]
332                    # Queries use the indexed_at date for time-based pagination (as opposed to created_at); used to continue query if needed
333                    last_date = SearchBluesky.bsky_convert_datetime_string(post.get("indexed_at"))
334                    if post_id in query_post_ids:
335                        # Skip duplicate posts
336                        continue
337
338                    new_posts += 1
339                    query_post_ids.add(post_id)
340
341                    # Add user handles from references
342                    did_to_handle[post["author"]["did"]] = post["author"]["handle"]
343                    # Mentions
344                    mentions = []
345                    if post["record"].get("facets"):
346                        for facet in post["record"]["facets"]:
347                            for feature in facet.get("features", {}):
348                                if feature.get("did"):
349                                    if feature["did"] in did_to_handle:
350                                        mentions.append({"did": feature["did"], "handle": did_to_handle[feature["did"]]})
351                                    else:
352                                        handle = SearchBluesky.bsky_get_handle_from_did(client, feature["did"])
353                                        if handle:
354                                            if handle.lower() in self.handle_lookup_error_messages:
355                                                self.dataset.log(f"Bluesky: user ({feature['did']}) {handle}")
356                                            mentions.append({"did": feature["did"], "handle": handle})
357                                            did_to_handle[feature["did"]] = handle
358                                        else:
359                                            mentions.append({"did": feature["did"], "handle": None})
360                                            self.dataset.log(f"Bluesky: could not lookup the handle for {feature['did']}")
361                    # Reply to
362                    reply_to_handle = None
363                    if post["record"].get("reply"):
364                        reply_to_did = post["record"]["reply"]["parent"]["uri"].split("/")[2]
365                        if reply_to_did in did_to_handle:
366                            reply_to_handle = did_to_handle[reply_to_did]
367                        else:
368                            handle = SearchBluesky.bsky_get_handle_from_did(client, reply_to_did)
369                            if handle:
370                                if handle.lower() in self.handle_lookup_error_messages:
371                                    self.dataset.log(f"Bluesky: user ({reply_to_did}) {handle}")
372                                reply_to_handle = handle
373                                did_to_handle[reply_to_did] = handle
374                            else:
375                                self.dataset.log(f"Bluesky: could not find handle for {reply_to_did}")
376
377
378                    post.update({"4CAT_metadata": {
379                        "collected_at": datetime.now().timestamp(),
380                        "query": query,
381                        "rank": rank,
382                        "mentions": mentions,
383                        "reply_to": reply_to_handle if reply_to_handle else None,
384                    }})
385                    rank += 1
386                    yield post
387                    total_posts += 1
388
389                # Check if there is a cursor for the next page
390                cursor = response['cursor']
391                if max_posts != 0:
392                    self.dataset.update_progress(total_posts / (max_posts * num_queries))
393
394                if 0 < max_posts <= rank:
395                    self.dataset.update_status(
396                        f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}")
397                    break
398
399                if not cursor:
400                    if new_posts:
401                        # Bluesky API seems to stop around 10000 posts and not return a cursor
402                        # Re-query with the same query to get the next set of posts using last_date (set above)
403                        self.dataset.log(f"Query {query}: {query_requests} requests")
404                        queries.insert(0, query)
405                    else:
406                        # No new posts; if we have not hit the max_posts, but no new posts are being returned, then we are done
407                        self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned")
408
409                    if rank:
410                        self.dataset.update_status(f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}")
411                    break  # No more pages, stop the loop
412                elif not items:
413                    self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned")
414                    break
415
416    @staticmethod
417    def map_item(item):
418        """
419        Convert item object to 4CAT-ready data object
420
421        :param dict item:  item to parse
422        :return dict:  4CAT-compatible item object
423        """
424        unmapped_data = []
425
426        # Add link to post; this is a Bluesky-specific URL and may not always be accurate
427        link = SearchBluesky.get_bsky_link(item['author']['handle'], item['uri'].split('/')[-1])
428        author_profile = f"https://bsky.app/profile/{item['author']['handle']}"
429
430        created_at = SearchBluesky.bsky_convert_datetime_string(item["record"].get("created_at",item["record"].get("createdAt")))
431
432        # Tags
433        tags = set()
434        links = set()
435        mentions_did = set()
436        has_poll = False
437        if item["record"].get("facets"):
438            for facet in item["record"].get("facets"):
439                for feature in facet.get("features"):
440                    if feature.get("tag"):
441                        tags.add(feature.get("tag"))
442                    elif feature.get("uri"):
443                        links.add(feature.get("uri"))
444                    elif feature.get("did"):
445                        mentions_did.add(feature.get("did"))
446                    elif feature.get("number"):
447                        has_poll = True
448                    else:
449                        unmapped_data.append({"loc": "record.facets.features", "obj": feature})
450                if "features" not in facet:
451                    unmapped_data.append({"loc": "record.facets", "obj": facet})
452
453        # Embeds are in both the item and the record; so far these always contain same item
454        embeded_links = set()
455        embeded_images = set()
456        image_references = set()
457        quoted_link = None
458        quoted_user = None
459        quoted_ref = None
460        possible_embeds = [item.get("embed", {}), item["record"].get("embed", {})]
461        while possible_embeds:
462            embed = possible_embeds.pop(0)
463            if not embed:
464                continue
465
466            py_type = embed.pop("py_type") if "py_type" in embed else (embed.pop("$type") if "$type" in embed else None)
467            if py_type in ["app.bsky.embed.recordWithMedia#view", "app.bsky.embed.recordWithMedia"]:
468                # contains post plus additional media
469                for key, media_ob in embed.items():
470                    possible_embeds.append(media_ob)
471
472            elif "images" in embed: # py_type in ["app.bsky.embed.images#view", "app.bsky.embed.images", "app.bsky.embed.images#main"]
473                for img_ob in embed["images"]:
474                    img_link = img_ob.get("fullsize", img_ob.get("thumb"))
475                    if img_link:
476                        embeded_images.add(img_link)
477                    elif img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")):
478                        # ob.get("image").get("ref").get("link") will have a reference that could be linked via API
479                        # BUT ref has already been obtained in other embeds...
480                        image_references.add(img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")))
481                    else:
482                        unmapped_data.append({"loc": "embed.images", "obj": img_ob})
483            elif py_type in ["app.bsky.embed.video#view", "app.bsky.embed.video"]:
484                # Does not appear to be direct video links, just thumbnail (Bluesky API may be able to get more)
485                if embed.get("thumbnail"):
486                    embeded_images.add(embed.get("thumbnail"))
487                elif embed.get("video", {}).get("ref", {}).get("link"):
488                    image_references.add(embed.get("video", {}).get("ref", {}).get("link"))
489                else:
490                    # No thumb for video
491                    pass
492            elif "record" in embed: # py_type in ["app.bsky.embed.record#view", "app.bsky.embed.record"]
493                # Quoted post
494                # Note: these may also contain images that would be seen, but are not part of the original post
495                if embed["record"].get("author", embed["record"].get("creator")):
496                    if "handle" not in embed["record"].get("author", embed["record"].get("creator")):
497                        # User may not be able to see original post
498                        if "app.bsky.feed.defs#blockedAuthor" == embed["record"].get("author", embed["record"].get("creator"))["py_type"]:
499                            quoted_link = "VIEWER BLOCKED BY AUTHOR"
500                        else:
501                            # New unknown
502                            unmapped_data.append({"loc": "embed.record.author", "obj": embed["record"].get("author", embed["record"].get("creator"))})
503                    else:
504                        # author seems to be a quoted post while creator a quoted "list"
505                        quoted_user = embed["record"].get("author", embed["record"].get("creator"))["handle"]
506                        quoted_link = SearchBluesky.get_bsky_link(quoted_user, embed['record']['uri'].split('/')[-1])
507                elif embed["record"].get("not_found"):
508                    quoted_link = "DELETED"
509                    # We do have the DID, but this information is not normally displayed
510                    # quoted_user = embed["record"]['uri'].split("/")[2]
511                elif embed["record"].get("detached"):
512                    quoted_link = "REMOVED BY AUTHOR"
513                else:
514                    quoted_ref = embed["record"]['uri']
515            elif "external" in embed: # py_type in ["app.bsky.embed.external#view", "app.bsky.embed.external"]
516                if embed["external"].get("uri"):
517                    embeded_links.add(embed["external"].get("uri"))
518                if embed["external"].get("thumb"):
519                    if isinstance(embed["external"]["thumb"], str):
520                        embeded_images.add(embed["external"]["thumb"])
521                    else:
522                        image_references.add(embed["external"]["thumb"].get("ref", {}).get("link", ""))
523                else:
524                    # No thumb for link
525                    pass
526            else:
527                unmapped_data.append({"loc": f"embed.{py_type}",
528                                      "obj": embed})
529
530        # Replies allowed
531        # https://docs.bsky.app/docs/tutorials/thread-gates
532        # threadgate object does not appear to differentiate between the types of replies allowed
533        replies_allowed = True if not item["threadgate"] else False
534
535        # Labels (content moderation)
536        labels = set() if not item["labels"] else set([label.get("val") for label in item["labels"]])
537        if item["record"].get("labels"):
538            labels = labels | set([label.get("val") for label in item["record"]["labels"].get("values",[])])
539
540        # Language
541        languages = "N/A" if not item["record"].get("langs") else ",".join(item["record"].get("langs"))
542
543        # Missing references
544        if any([ref for ref in image_references if ref not in "".join(embeded_images)]):
545            unmapped_data.append({"loc": "missing_image_refs", "obj": [ref for ref in image_references if ref not in "".join(embeded_images)]})
546        if quoted_ref:
547            if not quoted_link or (quoted_link not in ["DELETED", "REMOVED BY AUTHOR", "VIEWER BLOCKED BY AUTHOR"] and quoted_ref.split('/')[-1] not in quoted_link):
548                unmapped_data.append({"loc": "missing_quote_ref", "obj": quoted_ref})
549
550        # Reference Posts (expanded to include handles during collection)
551        # None: handles may change; true DID from original object stored item["record"]["facets"]["features"] w/ "did"
552        # Mentions
553        mentions = [(mention.get("handle") if (mention.get("handle") and mention["handle"].lower() not in SearchBluesky.handle_lookup_error_messages) else mention.get("did")) for mention in item.get("4CAT_metadata", {}).get("mentions", [])]
554        # Reply to
555        replied_to_post = None
556        replied_to_user = None
557        if item["record"].get("reply"):
558            if item["4CAT_metadata"]["reply_to"] and item["4CAT_metadata"]["reply_to"].lower() not in SearchBluesky.handle_lookup_error_messages:
559                replied_to_user = item["4CAT_metadata"]["reply_to"]
560            else:
561                # Use DID, though this will not create a working link
562                replied_to_user = item["record"]["reply"]["parent"]["uri"].split("/")[2]
563            replied_to_post = SearchBluesky.get_bsky_link(replied_to_user, item["record"]["reply"]["parent"]["uri"].split("/")[-1])
564
565        # These refer to slices of the text, but are also contained in the text or as an embed. If they are NOT also in the text and/or embed fields, then they are NOT displayed in bsky.app UI and thus only metadata
566        # if item["record"].get("entities"):
567        #     unmapped_data.append({"loc": "record.entities", "obj": item["record"]["entities"]})
568
569        # Author tags, not hashtags, not seen, very rarely used
570        # if item["record"].get("tags"):
571        #     unmapped_data.append({"loc": "record.tags", "obj": item["record"].get("tags")})
572
573        if unmapped_data:
574            # TODO: Use MappedItem message; currently it is not called...
575            config.with_db()
576            config.db.log.warning(f"Bluesky new mappings ({item['uri']}): {unmapped_data}")
577
578        return MappedItem({
579            "collected_at": datetime.fromtimestamp(item["4CAT_metadata"]["collected_at"]).isoformat(),
580            "query": item["4CAT_metadata"]["query"],
581            "rank": item["4CAT_metadata"]["rank"],
582            "id": item["uri"],
583            "thread_id": item["record"]["reply"]["root"]["uri"] if item["record"].get("reply") else item["uri"],
584            "created_at": created_at.isoformat(),
585            "author": item["author"]["handle"],
586            "author_id": item["author"]["did"],
587            "body": item["record"]["text"],
588            "link": link,
589            "tags": ",".join(tags),
590            "like_count": item["like_count"],
591            "quote_count": item["quote_count"],
592            "reply_count": item["reply_count"],
593            "repost_count": item["repost_count"],
594            "quoted_post": quoted_link if quoted_link else "",
595            "quoted_user": quoted_user if quoted_user else "",
596            "replied_to_post": replied_to_post if replied_to_post else "",
597            "replied_to_user": replied_to_user if replied_to_user else "",
598            "replies_allowed": replies_allowed,
599            "mentions": ",".join(mentions),
600            "links": ",".join(embeded_links | links),
601            "images": ",".join(embeded_images),
602            "labels": ",".join(labels),
603            "has_poll": has_poll,
604            "languages": languages,
605
606            "author_display_name": item["author"]["display_name"],
607            "author_profile": author_profile,
608            "author_avatar": item["author"]["avatar"],
609            "author_created_at": SearchBluesky.bsky_convert_datetime_string(item["author"]["created_at"], mode="iso_string", raise_error=False),
610
611            "timestamp": created_at.timestamp(),
612        }, message=f"Bluesky new mappings: {unmapped_data}")
613
614    @staticmethod
615    def bsky_convert_datetime_string(datetime_string, mode="datetime", raise_error=True):
616        """
617        Bluesky datetime string to datetime object.
618
619        Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string.
620
621        :param str datetime_string:  The datetime string to convert
622        :param str mode:  The mode to return the datetime object in [datetime, iso_string]
623        :param bool raise_error:  Raise error if unable to parse else return datetime_string
624        :return datetime/str:  The converted datetime object
625        """
626        try:
627            datetime_object = parser.isoparse(datetime_string)
628        except ValueError as e:
629            if raise_error:
630                raise e
631            return datetime_string
632        
633        if mode == "datetime":
634            return datetime_object
635        elif mode == "iso_string":
636            return datetime_object.isoformat()
637
638
639    @staticmethod
640    def get_bsky_link(handle, post_id):
641        """
642        Get link to Bluesky post
643        """
644        return f"https://bsky.app/profile/{handle}/post/{post_id}"
645
646    @staticmethod
647    def bsky_get_handle_from_did(client, did):
648        """
649        Get handle from DID
650        """
651        tries = 0
652        while True:
653            try:
654                user_profile = client.app.bsky.actor.get_profile({"actor": did})
655                if user_profile:
656                    return user_profile.handle
657                else:
658                    return None
659            except InvokeTimeoutError as e:
660                # Network error; try again
661                tries += 1
662                time.sleep(1)
663                if tries > 3:
664                    return None
665                continue
666            except BadRequestError as e:
667                if e.response.content.message:
668                    return e.response.content.message
669                return None
670
671    @staticmethod
672    def bsky_login(username, password, session_id):
673        """
674        Login to Bluesky
675
676        :param str username:  Username for Bluesky
677        :param str password:  Password for Bluesky
678        :param str session_id:  Session ID to use for login
679        :return Client:  Client object with login credentials
680        """
681        if not session_id:
682            session_id = SearchBluesky.create_session_id(username, password)
683        elif (not username or not password) and not session_id:
684            raise ValueError("Must provide both username and password or else session_id.")
685
686        session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), "bsky_" + session_id + ".session")
687
688        def on_session_change(event: SessionEvent, session: Session) -> None:
689            """
690            Save session to file; atproto session change event handler should keep the session up to date
691
692            https://atproto.blue/en/latest/atproto_client/auth.html
693            """
694            print('Session changed:', event, repr(session))
695            if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
696                print('Saving changed session')
697                with session_path.open("w") as session_file:
698                    session_file.write(session.export())
699
700        client = Client()
701        client.on_session_change(on_session_change)
702        if session_path.exists():
703            with session_path.open() as session_file:
704                session_string = session_file.read()
705                client.login(session_string=session_string)
706        else:
707            # Were not able to log in via session string; login with username and password
708            client.login(login=username, password=password)
709        return client
710
711    @staticmethod
712    def create_session_id(username, password):
713        """
714        Generate a filename for the session file
715
716        This is a combination of username and password, but hashed
717        so that one cannot actually derive someone's information.
718
719        :param str username:  Username for Bluesky
720        :param str password:  Password for Bluesky
721        :return str: A hash value derived from the input
722        """
723        hash_base = username.strip() + str(password).strip()
724        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()

Search for posts in Bluesky

type = 'bsky-search'
category = 'Search'
title = 'Bluesky Search'
description = 'Collects Bluesky posts via its API.'
extension = 'ndjson'
is_local = False
is_static = False
config = {'bsky-search.max_results': {'type': 'string', 'help': 'Maximum results per query', 'coerce_type': <class 'int'>, 'min': 0, 'default': 50000, 'tooltip': "Amount of results (e.g., posts) per query. '0' will allow unlimited."}}
handle_lookup_error_messages = ['account is deactivated', 'profile not found', 'account has been suspended']
@classmethod
def get_options(cls, parent_dataset=None, user=None):
 49    @classmethod
 50    def get_options(cls, parent_dataset=None, user=None):
 51        """
 52        Get processor options
 53
 54        Just updates the description of the entities field based on the
 55        configured max entities.
 56
 57        :param DataSet parent_dataset:  An object representing the dataset that
 58          the processor would be run on
 59        :param User user:  Flask user the options will be displayed for, in
 60          case they are requested for display in the 4CAT web interface. This can
 61          be used to show some options only to privileges users.
 62        """
 63        options = {
 64            "intro": {
 65                "type": UserInput.OPTION_INFO,
 66                "help": "Collects Bluesky posts via its API.\n\nYour login credentials will be sent to the 4CAT server "
 67                        "and stored there while data is fetched. After the dataset has been created your credentials "
 68                        "will be deleted from the server. \n[See tips and tricks on how to query Bluesky](https://bsky.social/about/blog/05-31-2024-search)."
 69            },
 70            "username": {
 71                "type": UserInput.OPTION_TEXT,
 72                "help": "Bluesky Username",
 73                "cache": True,
 74                "sensitive": True,
 75            },
 76            "password": {
 77                "type": UserInput.OPTION_TEXT,
 78                "help": "Bluesky Password",
 79                "cache": True, # tells the frontend to cache this value
 80                "sensitive": True, # tells the backend to delete this value after use
 81                "password": True, # tells the frontend this is a password type
 82            },
 83            "divider": {
 84                "type": UserInput.OPTION_DIVIDER
 85            },
 86            "query": {
 87                "type": UserInput.OPTION_TEXT_LARGE,
 88                "help": "Search Queries",
 89                "tooltip": "Separate with commas or line breaks."
 90            },
 91            "max_posts": {
 92                "type": UserInput.OPTION_TEXT,
 93                "help": "Max posts per query",
 94                "min": 1,
 95                "default": 100
 96            },
 97            "daterange": {
 98                "type": UserInput.OPTION_DATERANGE,
 99                "help": "Date range",
100                "tooltip": "The date range for the search. No date range will search all posts."
101            },
102        }
103
104        # Update the max_posts setting from config
105        max_posts = int(config.get('bsky-search.max_results', 100, user=user))
106        if max_posts == 0:
107            # This is potentially madness
108            options["max_posts"]["tooltip"] = "Set to 0 to collect all posts."
109            options['max_posts']['min'] = 0
110        else:
111            options["max_posts"]["max"] = max_posts
112            options['max_posts']['default'] = options['max_posts']['default'] if options['max_posts']['default'] <= max_posts else max_posts
113
114        return options

Get processor options

Just updates the description of the entities field based on the configured max entities.

Parameters
  • DataSet parent_dataset: An object representing the dataset that the processor would be run on
  • User user: Flask user the options will be displayed for, in case they are requested for display in the 4CAT web interface. This can be used to show some options only to privileges users.
@staticmethod
def validate_query(query, request, user):
116    @staticmethod
117    def validate_query(query, request, user):
118        """
119        Validate Bluesky query
120
121        :param dict query:  Query parameters, from client-side.
122        :param request:  Flask request
123        :param User user:  User object of user who has submitted the query
124        :return dict:  Safe query parameters
125        """
126        # no query 4 u
127        if not query.get("query", "").strip():
128            raise QueryParametersException("You must provide a search query.")
129
130        if not query.get("username", None) or not query.get("password", None) :
131            raise QueryParametersException("You need to provide valid Bluesky login credentials first.")
132
133        # Test login credentials
134        session_id = SearchBluesky.create_session_id(query["username"], query["password"])
135        try:
136            SearchBluesky.bsky_login(username=query["username"], password=query["password"], session_id=session_id)
137        except UnauthorizedError as e:
138            raise QueryParametersException("Invalid Bluesky login credentials.")
139        except RequestException as e:
140            if e.response.content.message == 'Rate Limit Exceeded':
141                lifted_at = datetime.fromtimestamp(int(e.response.headers["ratelimit-reset"]))
142                raise QueryParametersException(f"Bluesky rate limit exceeded. Try again after {lifted_at.strftime('%Y-%m-%d %H:%M:%S')}.")
143            else:
144                raise QueryParametersException(f"Bluesky login failed. {e.response.content.message}")
145
146        # sanitize query
147        sanitized_query = [q.strip() for q in query.get("query").replace("\n", ",").split(",") if q.strip()]
148
149        # the dates need to make sense as a range to search within
150        min_date, max_date = query.get("daterange")
151        if min_date and max_date and min_date > max_date:
152            raise QueryParametersException("The start date must be before the end date.")
153
154        # Only check this if not already confirmed by the frontend
155        posts_per_second = 55 # gathered from simply checking start/end times of logs
156        if not query.get("frontend-confirm"):
157            # Estimate is not returned; use max_posts as a rough estimate
158            max_posts = query.get("max_posts", 100)
159            expected_tweets = query.get("max_posts", 100) * len(sanitized_query)
160            # Warn if process may take more than ~1 hours
161            if expected_tweets > (posts_per_second * 3600):
162                expected_time = timify_long(expected_tweets / posts_per_second)
163                raise QueryNeedsExplicitConfirmationException(f"This query matches approximately {expected_tweets} tweets and may take {expected_time} to complete. Do you want to continue?")
164            elif max_posts == 0 and not min_date:
165                raise QueryNeedsExplicitConfirmationException(f"No maximum number of posts set! This query has no minimum date and thus may take a very, very long time to complete. Do you want to continue?")
166            elif max_posts == 0:
167                raise QueryNeedsExplicitConfirmationException(f"No maximum number of posts set! This query may take a long time to complete. Do you want to continue?")
168
169        return {
170            "max_posts": query.get("max_posts"),
171            "query": ",".join(sanitized_query),
172            "username": query.get("username"),
173            "password": query.get("password"),
174            "session_id": session_id,
175            "min_date": min_date,
176            "max_date": max_date,
177        }

Validate Bluesky query

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • User user: User object of user who has submitted the query
Returns

Safe query parameters

def get_items(self, query):
179    def get_items(self, query):
180        """
181        Execute a query; get messages for given parameters
182
183        Basically a wrapper around execute_queries() to call it with asyncio.
184
185        :param dict query:  Query parameters, as part of the DataSet object
186        :return list:  Posts, sorted by thread and post ID, in ascending order
187        """
188        if not query.get("session_id") and (not query.get("username") or not query.get("password")):
189            return self.dataset.finish_with_error("Your Bluesky login credentials are no longer available in 4CAT; please re-create this datasource.")
190
191        session_id = SearchBluesky.create_session_id(query.get("username"), query.get("password")) if not query.get("session_id") else query["session_id"]
192        try:
193            client = SearchBluesky.bsky_login(username=query.get("username"), password=query.get("password"), session_id=session_id)
194        except (UnauthorizedError, RequestException, BadRequestError) as e:
195            self.dataset.log(f"Bluesky login failed: {e}")
196            return self.dataset.finish_with_error("Bluesky login failed; please re-create this datasource.")
197
198        self.dataset.update_status(f"Collecting posts from Bluesky as {client.me.handle}")
199
200        max_posts = query.get("max_posts", 100)
201        limit = 100 if (max_posts > 100 or max_posts == 0) else max_posts
202
203        # Handle reference mapping; user references use did instead of dynamic handle
204        did_to_handle = {}
205
206        query_parameters = {
207            "limit": limit,
208        }
209
210        # Add start and end dates if provided
211        if self.parameters.get("min_date"):
212            query_parameters['since'] = datetime.fromtimestamp(self.parameters.get("min_date")).strftime('%Y-%m-%dT%H:%M:%SZ')
213        if self.parameters.get("max_date"):
214            query_parameters['until'] = datetime.fromtimestamp(self.parameters.get("max_date")).strftime('%Y-%m-%dT%H:%M:%SZ')
215
216        queries = query.get("query").split(",")
217        num_queries = len(queries)
218        total_posts = 0
219        i = 0
220        last_query = None
221        last_date = None
222        while queries:
223            query = queries.pop(0)
224            if query == last_query:
225                # Check if there are continued posts from the last query
226                query_parameters['until'] = last_date.strftime('%Y-%m-%dT%H:%M:%SZ')
227                self.dataset.log(f"Continuing query ({i} of {num_queries}): {query} from {last_date.strftime('%Y-%m-%dT%H:%M:%SZ')}")
228            else:
229                # New query
230                query_post_ids = set()
231                i += 1
232                rank = 0
233                last_query = query
234                last_date = None
235                self.dataset.update_status(f"Collecting query ({i} of {num_queries}): {query}")
236                query_requests = 0
237
238            query_parameters["q"] = query
239            cursor = None  # Start with no cursor (first page)
240            search_for_invalid_post = False
241            invalid_post_counter = 0
242            while True:
243                if self.interrupted:
244                    raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API")
245                # Query posts, including pagination (cursor for next page)
246                tries = 0
247                response = None
248                while tries < 3:
249                    query_parameters["cursor"] = cursor
250                    try:
251                        response = client.app.bsky.feed.search_posts(params=query_parameters)
252                        break
253                    except ModelError as e:
254                        # Post validation error; one post is unable to be read
255                        # Pattern: some invalid post raises error, we switch from higher limit (usually 100) to 1 in
256                        # order to collect post by post, invalid post is identified again, we switch back to higher
257                        # limit and continue as normal, at the "end" of a cursor/query life (~10k posts) a NetworkError
258                        # is raised with detail refering to a server error 502 InternalServerError, we catch that and
259                        # add the query back to the queue with a new "until" date to continue the query
260                        # https://github.com/bluesky-social/atproto/issues/3446
261                        if not search_for_invalid_post:
262                            # New invalid post, search and skip
263                            self.dataset.log(f"Invalid post detected; searching post by post: {e}")
264                            search_for_invalid_post = True
265                            # Currently we must search post by post to find the invalid post
266                            query_parameters["limit"] = 1
267                        else:
268                            # Found invalid post, skip, reset counters
269                            self.dataset.log(
270                                f"Invalid post identified; skipping and continue with query as normal: {e}")
271                            search_for_invalid_post = False
272                            # Reset limit to normal
273                            query_parameters["limit"] = limit
274                            invalid_post_counter = 0
275                            cursor = str(int(cursor) + 1) if cursor else None
276                        # Re-query with new cursor & limit
277                        continue
278
279                    except InvokeTimeoutError as e:
280                        # Timeout error, but can occur for odd queries with no results
281                        self.dataset.log(f"Bluesky request error for query {query}: {e}")
282                        time.sleep(1)
283                        tries += 2
284                        continue
285                    except NetworkError as e:
286                        # 502 InternalServerError: occurs if switch limits in a "set" (i.e. the vague 10k posts cursor limit), I seem to get this error around the 10k mark instead of just a missing cursor as normal
287                        self.dataset.log(f"Bluesky network error for query {query}; retrying: {e}")
288                        time.sleep(1 + (tries * 10))
289                        queries.insert(0, query)
290                        break
291
292                if not response:
293                    # Expected from NetworkError, but the query will have been added back to the queue
294                    # If not, then there was a problem with the query
295                    if len(queries) == 0:
296                        self.dataset.update_status(f"Error collecting posts from Bluesky; see log for details", is_final=True)
297                    if query not in queries:
298                        # Query was not added back; there was an unexpected issue with the query itself
299                        self.dataset.update_status(f"Error continuing {query} from Bluesky (see log for details); continuing to next query")
300                    break
301
302                query_requests += 1
303                items = response['posts'] if hasattr(response, 'posts') else []
304
305                if search_for_invalid_post:
306                    invalid_post_counter += 1
307                    if invalid_post_counter >= 100:
308                        #  Max limit is 100; this should not occur, but we do not want to continue searching post by post indefinitely
309                        self.dataset.log(f"Unable to identify invalid post; discontinuing search")
310                        query_parameters["limit"] = limit
311                        search_for_invalid_post = False
312                        invalid_post_counter = 0
313
314                    if not items:
315                        # Sometimes no post is returned, but there still may be posts following
316                        self.dataset.log(f"Query {query} w/ params {query_parameters} returned no posts: {response}")
317                        # TODO: this is odd; no information is returned as to why that one item is not returned and no error is raised
318                        cursor = str(int(cursor) + 1) if cursor else None
319                        continue
320
321                new_posts = 0
322                # Handle the posts
323                for item in items:
324                    if 0 < max_posts <= rank:
325                        break
326
327                    if self.interrupted:
328                        raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API")
329
330                    post = item.model_dump()
331                    post_id = post["uri"]
332                    # Queries use the indexed_at date for time-based pagination (as opposed to created_at); used to continue query if needed
333                    last_date = SearchBluesky.bsky_convert_datetime_string(post.get("indexed_at"))
334                    if post_id in query_post_ids:
335                        # Skip duplicate posts
336                        continue
337
338                    new_posts += 1
339                    query_post_ids.add(post_id)
340
341                    # Add user handles from references
342                    did_to_handle[post["author"]["did"]] = post["author"]["handle"]
343                    # Mentions
344                    mentions = []
345                    if post["record"].get("facets"):
346                        for facet in post["record"]["facets"]:
347                            for feature in facet.get("features", {}):
348                                if feature.get("did"):
349                                    if feature["did"] in did_to_handle:
350                                        mentions.append({"did": feature["did"], "handle": did_to_handle[feature["did"]]})
351                                    else:
352                                        handle = SearchBluesky.bsky_get_handle_from_did(client, feature["did"])
353                                        if handle:
354                                            if handle.lower() in self.handle_lookup_error_messages:
355                                                self.dataset.log(f"Bluesky: user ({feature['did']}) {handle}")
356                                            mentions.append({"did": feature["did"], "handle": handle})
357                                            did_to_handle[feature["did"]] = handle
358                                        else:
359                                            mentions.append({"did": feature["did"], "handle": None})
360                                            self.dataset.log(f"Bluesky: could not lookup the handle for {feature['did']}")
361                    # Reply to
362                    reply_to_handle = None
363                    if post["record"].get("reply"):
364                        reply_to_did = post["record"]["reply"]["parent"]["uri"].split("/")[2]
365                        if reply_to_did in did_to_handle:
366                            reply_to_handle = did_to_handle[reply_to_did]
367                        else:
368                            handle = SearchBluesky.bsky_get_handle_from_did(client, reply_to_did)
369                            if handle:
370                                if handle.lower() in self.handle_lookup_error_messages:
371                                    self.dataset.log(f"Bluesky: user ({reply_to_did}) {handle}")
372                                reply_to_handle = handle
373                                did_to_handle[reply_to_did] = handle
374                            else:
375                                self.dataset.log(f"Bluesky: could not find handle for {reply_to_did}")
376
377
378                    post.update({"4CAT_metadata": {
379                        "collected_at": datetime.now().timestamp(),
380                        "query": query,
381                        "rank": rank,
382                        "mentions": mentions,
383                        "reply_to": reply_to_handle if reply_to_handle else None,
384                    }})
385                    rank += 1
386                    yield post
387                    total_posts += 1
388
389                # Check if there is a cursor for the next page
390                cursor = response['cursor']
391                if max_posts != 0:
392                    self.dataset.update_progress(total_posts / (max_posts * num_queries))
393
394                if 0 < max_posts <= rank:
395                    self.dataset.update_status(
396                        f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}")
397                    break
398
399                if not cursor:
400                    if new_posts:
401                        # Bluesky API seems to stop around 10000 posts and not return a cursor
402                        # Re-query with the same query to get the next set of posts using last_date (set above)
403                        self.dataset.log(f"Query {query}: {query_requests} requests")
404                        queries.insert(0, query)
405                    else:
406                        # No new posts; if we have not hit the max_posts, but no new posts are being returned, then we are done
407                        self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned")
408
409                    if rank:
410                        self.dataset.update_status(f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}")
411                    break  # No more pages, stop the loop
412                elif not items:
413                    self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned")
414                    break

Execute a query; get messages for given parameters

Basically a wrapper around execute_queries() to call it with asyncio.

Parameters
  • dict query: Query parameters, as part of the DataSet object
Returns

Posts, sorted by thread and post ID, in ascending order

@staticmethod
def map_item(item):
416    @staticmethod
417    def map_item(item):
418        """
419        Convert item object to 4CAT-ready data object
420
421        :param dict item:  item to parse
422        :return dict:  4CAT-compatible item object
423        """
424        unmapped_data = []
425
426        # Add link to post; this is a Bluesky-specific URL and may not always be accurate
427        link = SearchBluesky.get_bsky_link(item['author']['handle'], item['uri'].split('/')[-1])
428        author_profile = f"https://bsky.app/profile/{item['author']['handle']}"
429
430        created_at = SearchBluesky.bsky_convert_datetime_string(item["record"].get("created_at",item["record"].get("createdAt")))
431
432        # Tags
433        tags = set()
434        links = set()
435        mentions_did = set()
436        has_poll = False
437        if item["record"].get("facets"):
438            for facet in item["record"].get("facets"):
439                for feature in facet.get("features"):
440                    if feature.get("tag"):
441                        tags.add(feature.get("tag"))
442                    elif feature.get("uri"):
443                        links.add(feature.get("uri"))
444                    elif feature.get("did"):
445                        mentions_did.add(feature.get("did"))
446                    elif feature.get("number"):
447                        has_poll = True
448                    else:
449                        unmapped_data.append({"loc": "record.facets.features", "obj": feature})
450                if "features" not in facet:
451                    unmapped_data.append({"loc": "record.facets", "obj": facet})
452
453        # Embeds are in both the item and the record; so far these always contain same item
454        embeded_links = set()
455        embeded_images = set()
456        image_references = set()
457        quoted_link = None
458        quoted_user = None
459        quoted_ref = None
460        possible_embeds = [item.get("embed", {}), item["record"].get("embed", {})]
461        while possible_embeds:
462            embed = possible_embeds.pop(0)
463            if not embed:
464                continue
465
466            py_type = embed.pop("py_type") if "py_type" in embed else (embed.pop("$type") if "$type" in embed else None)
467            if py_type in ["app.bsky.embed.recordWithMedia#view", "app.bsky.embed.recordWithMedia"]:
468                # contains post plus additional media
469                for key, media_ob in embed.items():
470                    possible_embeds.append(media_ob)
471
472            elif "images" in embed: # py_type in ["app.bsky.embed.images#view", "app.bsky.embed.images", "app.bsky.embed.images#main"]
473                for img_ob in embed["images"]:
474                    img_link = img_ob.get("fullsize", img_ob.get("thumb"))
475                    if img_link:
476                        embeded_images.add(img_link)
477                    elif img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")):
478                        # ob.get("image").get("ref").get("link") will have a reference that could be linked via API
479                        # BUT ref has already been obtained in other embeds...
480                        image_references.add(img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")))
481                    else:
482                        unmapped_data.append({"loc": "embed.images", "obj": img_ob})
483            elif py_type in ["app.bsky.embed.video#view", "app.bsky.embed.video"]:
484                # Does not appear to be direct video links, just thumbnail (Bluesky API may be able to get more)
485                if embed.get("thumbnail"):
486                    embeded_images.add(embed.get("thumbnail"))
487                elif embed.get("video", {}).get("ref", {}).get("link"):
488                    image_references.add(embed.get("video", {}).get("ref", {}).get("link"))
489                else:
490                    # No thumb for video
491                    pass
492            elif "record" in embed: # py_type in ["app.bsky.embed.record#view", "app.bsky.embed.record"]
493                # Quoted post
494                # Note: these may also contain images that would be seen, but are not part of the original post
495                if embed["record"].get("author", embed["record"].get("creator")):
496                    if "handle" not in embed["record"].get("author", embed["record"].get("creator")):
497                        # User may not be able to see original post
498                        if "app.bsky.feed.defs#blockedAuthor" == embed["record"].get("author", embed["record"].get("creator"))["py_type"]:
499                            quoted_link = "VIEWER BLOCKED BY AUTHOR"
500                        else:
501                            # New unknown
502                            unmapped_data.append({"loc": "embed.record.author", "obj": embed["record"].get("author", embed["record"].get("creator"))})
503                    else:
504                        # author seems to be a quoted post while creator a quoted "list"
505                        quoted_user = embed["record"].get("author", embed["record"].get("creator"))["handle"]
506                        quoted_link = SearchBluesky.get_bsky_link(quoted_user, embed['record']['uri'].split('/')[-1])
507                elif embed["record"].get("not_found"):
508                    quoted_link = "DELETED"
509                    # We do have the DID, but this information is not normally displayed
510                    # quoted_user = embed["record"]['uri'].split("/")[2]
511                elif embed["record"].get("detached"):
512                    quoted_link = "REMOVED BY AUTHOR"
513                else:
514                    quoted_ref = embed["record"]['uri']
515            elif "external" in embed: # py_type in ["app.bsky.embed.external#view", "app.bsky.embed.external"]
516                if embed["external"].get("uri"):
517                    embeded_links.add(embed["external"].get("uri"))
518                if embed["external"].get("thumb"):
519                    if isinstance(embed["external"]["thumb"], str):
520                        embeded_images.add(embed["external"]["thumb"])
521                    else:
522                        image_references.add(embed["external"]["thumb"].get("ref", {}).get("link", ""))
523                else:
524                    # No thumb for link
525                    pass
526            else:
527                unmapped_data.append({"loc": f"embed.{py_type}",
528                                      "obj": embed})
529
530        # Replies allowed
531        # https://docs.bsky.app/docs/tutorials/thread-gates
532        # threadgate object does not appear to differentiate between the types of replies allowed
533        replies_allowed = True if not item["threadgate"] else False
534
535        # Labels (content moderation)
536        labels = set() if not item["labels"] else set([label.get("val") for label in item["labels"]])
537        if item["record"].get("labels"):
538            labels = labels | set([label.get("val") for label in item["record"]["labels"].get("values",[])])
539
540        # Language
541        languages = "N/A" if not item["record"].get("langs") else ",".join(item["record"].get("langs"))
542
543        # Missing references
544        if any([ref for ref in image_references if ref not in "".join(embeded_images)]):
545            unmapped_data.append({"loc": "missing_image_refs", "obj": [ref for ref in image_references if ref not in "".join(embeded_images)]})
546        if quoted_ref:
547            if not quoted_link or (quoted_link not in ["DELETED", "REMOVED BY AUTHOR", "VIEWER BLOCKED BY AUTHOR"] and quoted_ref.split('/')[-1] not in quoted_link):
548                unmapped_data.append({"loc": "missing_quote_ref", "obj": quoted_ref})
549
550        # Reference Posts (expanded to include handles during collection)
551        # None: handles may change; true DID from original object stored item["record"]["facets"]["features"] w/ "did"
552        # Mentions
553        mentions = [(mention.get("handle") if (mention.get("handle") and mention["handle"].lower() not in SearchBluesky.handle_lookup_error_messages) else mention.get("did")) for mention in item.get("4CAT_metadata", {}).get("mentions", [])]
554        # Reply to
555        replied_to_post = None
556        replied_to_user = None
557        if item["record"].get("reply"):
558            if item["4CAT_metadata"]["reply_to"] and item["4CAT_metadata"]["reply_to"].lower() not in SearchBluesky.handle_lookup_error_messages:
559                replied_to_user = item["4CAT_metadata"]["reply_to"]
560            else:
561                # Use DID, though this will not create a working link
562                replied_to_user = item["record"]["reply"]["parent"]["uri"].split("/")[2]
563            replied_to_post = SearchBluesky.get_bsky_link(replied_to_user, item["record"]["reply"]["parent"]["uri"].split("/")[-1])
564
565        # These refer to slices of the text, but are also contained in the text or as an embed. If they are NOT also in the text and/or embed fields, then they are NOT displayed in bsky.app UI and thus only metadata
566        # if item["record"].get("entities"):
567        #     unmapped_data.append({"loc": "record.entities", "obj": item["record"]["entities"]})
568
569        # Author tags, not hashtags, not seen, very rarely used
570        # if item["record"].get("tags"):
571        #     unmapped_data.append({"loc": "record.tags", "obj": item["record"].get("tags")})
572
573        if unmapped_data:
574            # TODO: Use MappedItem message; currently it is not called...
575            config.with_db()
576            config.db.log.warning(f"Bluesky new mappings ({item['uri']}): {unmapped_data}")
577
578        return MappedItem({
579            "collected_at": datetime.fromtimestamp(item["4CAT_metadata"]["collected_at"]).isoformat(),
580            "query": item["4CAT_metadata"]["query"],
581            "rank": item["4CAT_metadata"]["rank"],
582            "id": item["uri"],
583            "thread_id": item["record"]["reply"]["root"]["uri"] if item["record"].get("reply") else item["uri"],
584            "created_at": created_at.isoformat(),
585            "author": item["author"]["handle"],
586            "author_id": item["author"]["did"],
587            "body": item["record"]["text"],
588            "link": link,
589            "tags": ",".join(tags),
590            "like_count": item["like_count"],
591            "quote_count": item["quote_count"],
592            "reply_count": item["reply_count"],
593            "repost_count": item["repost_count"],
594            "quoted_post": quoted_link if quoted_link else "",
595            "quoted_user": quoted_user if quoted_user else "",
596            "replied_to_post": replied_to_post if replied_to_post else "",
597            "replied_to_user": replied_to_user if replied_to_user else "",
598            "replies_allowed": replies_allowed,
599            "mentions": ",".join(mentions),
600            "links": ",".join(embeded_links | links),
601            "images": ",".join(embeded_images),
602            "labels": ",".join(labels),
603            "has_poll": has_poll,
604            "languages": languages,
605
606            "author_display_name": item["author"]["display_name"],
607            "author_profile": author_profile,
608            "author_avatar": item["author"]["avatar"],
609            "author_created_at": SearchBluesky.bsky_convert_datetime_string(item["author"]["created_at"], mode="iso_string", raise_error=False),
610
611            "timestamp": created_at.timestamp(),
612        }, message=f"Bluesky new mappings: {unmapped_data}")

Convert item object to 4CAT-ready data object

Parameters
  • dict item: item to parse
Returns

4CAT-compatible item object

@staticmethod
def bsky_convert_datetime_string(datetime_string, mode='datetime', raise_error=True):
614    @staticmethod
615    def bsky_convert_datetime_string(datetime_string, mode="datetime", raise_error=True):
616        """
617        Bluesky datetime string to datetime object.
618
619        Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string.
620
621        :param str datetime_string:  The datetime string to convert
622        :param str mode:  The mode to return the datetime object in [datetime, iso_string]
623        :param bool raise_error:  Raise error if unable to parse else return datetime_string
624        :return datetime/str:  The converted datetime object
625        """
626        try:
627            datetime_object = parser.isoparse(datetime_string)
628        except ValueError as e:
629            if raise_error:
630                raise e
631            return datetime_string
632        
633        if mode == "datetime":
634            return datetime_object
635        elif mode == "iso_string":
636            return datetime_object.isoformat()

Bluesky datetime string to datetime object.

Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string.

Parameters
  • str datetime_string: The datetime string to convert
  • str mode: The mode to return the datetime object in [datetime, iso_string]
  • bool raise_error: Raise error if unable to parse else return datetime_string
Returns

The converted datetime object

@staticmethod
def bsky_get_handle_from_did(client, did):
646    @staticmethod
647    def bsky_get_handle_from_did(client, did):
648        """
649        Get handle from DID
650        """
651        tries = 0
652        while True:
653            try:
654                user_profile = client.app.bsky.actor.get_profile({"actor": did})
655                if user_profile:
656                    return user_profile.handle
657                else:
658                    return None
659            except InvokeTimeoutError as e:
660                # Network error; try again
661                tries += 1
662                time.sleep(1)
663                if tries > 3:
664                    return None
665                continue
666            except BadRequestError as e:
667                if e.response.content.message:
668                    return e.response.content.message
669                return None

Get handle from DID

@staticmethod
def bsky_login(username, password, session_id):
671    @staticmethod
672    def bsky_login(username, password, session_id):
673        """
674        Login to Bluesky
675
676        :param str username:  Username for Bluesky
677        :param str password:  Password for Bluesky
678        :param str session_id:  Session ID to use for login
679        :return Client:  Client object with login credentials
680        """
681        if not session_id:
682            session_id = SearchBluesky.create_session_id(username, password)
683        elif (not username or not password) and not session_id:
684            raise ValueError("Must provide both username and password or else session_id.")
685
686        session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), "bsky_" + session_id + ".session")
687
688        def on_session_change(event: SessionEvent, session: Session) -> None:
689            """
690            Save session to file; atproto session change event handler should keep the session up to date
691
692            https://atproto.blue/en/latest/atproto_client/auth.html
693            """
694            print('Session changed:', event, repr(session))
695            if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
696                print('Saving changed session')
697                with session_path.open("w") as session_file:
698                    session_file.write(session.export())
699
700        client = Client()
701        client.on_session_change(on_session_change)
702        if session_path.exists():
703            with session_path.open() as session_file:
704                session_string = session_file.read()
705                client.login(session_string=session_string)
706        else:
707            # Were not able to log in via session string; login with username and password
708            client.login(login=username, password=password)
709        return client

Login to Bluesky

Parameters
  • str username: Username for Bluesky
  • str password: Password for Bluesky
  • str session_id: Session ID to use for login
Returns

Client object with login credentials

@staticmethod
def create_session_id(username, password):
711    @staticmethod
712    def create_session_id(username, password):
713        """
714        Generate a filename for the session file
715
716        This is a combination of username and password, but hashed
717        so that one cannot actually derive someone's information.
718
719        :param str username:  Username for Bluesky
720        :param str password:  Password for Bluesky
721        :return str: A hash value derived from the input
722        """
723        hash_base = username.strip() + str(password).strip()
724        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()

Generate a filename for the session file

This is a combination of username and password, but hashed so that one cannot actually derive someone's information.

Parameters
  • str username: Username for Bluesky
  • str password: Password for Bluesky
Returns

A hash value derived from the input