Edit on GitHub

datasources.bsky.search_bsky

Collect Bluesky posts

  1"""
  2Collect Bluesky posts
  3"""
  4import hashlib
  5import time
  6from datetime import datetime
  7
  8from dateutil import parser
  9
 10from atproto import Client, Session, SessionEvent
 11from atproto_client.exceptions import UnauthorizedError, BadRequestError, InvokeTimeoutError, RequestException, \
 12    ModelError, NetworkError
 13
 14from backend.lib.search import Search
 15from common.lib.exceptions import QueryParametersException, QueryNeedsExplicitConfirmationException, \
 16    ProcessorInterruptedException
 17from common.lib.helpers import timify
 18from common.lib.user_input import UserInput
 19from common.lib.item_mapping import MappedItem
 20
 21class SearchBluesky(Search):
 22    """
 23    Search for posts in Bluesky
 24    """
 25    type = "bsky-search"  # job ID
 26    category = "Search"  # category
 27    title = "Bluesky Search"  # title displayed in UI
 28    description = "Collects Bluesky posts via its API."  # description displayed in UI
 29    extension = "ndjson"  # extension of result file, used internally and in UI
 30    is_local = False  # Whether this datasource is locally scraped
 31    is_static = False  # Whether this datasource is still updated
 32
 33    config = {
 34        "bsky-search.max_results": {
 35            "type": UserInput.OPTION_TEXT,
 36            "help": "Maximum results per query",
 37            "coerce_type": int,
 38            "min": 0,
 39            "default": 50000,
 40            "tooltip": "Amount of results (e.g., posts) per query. '0' will allow unlimited."
 41        }
 42    }
 43
 44    handle_lookup_error_messages = ['account is deactivated', "profile not found", "account has been suspended"]
 45
 46    @classmethod
 47    def get_options(cls, parent_dataset=None, config=None):
 48        """
 49        Get processor options
 50
 51        Just updates the description of the entities field based on the
 52        configured max entities.
 53
 54        :param DataSet parent_dataset:  An object representing the dataset that
 55          the processor would be run on
 56        :param User user:  Flask user the options will be displayed for, in
 57          case they are requested for display in the 4CAT web interface. This can
 58          be used to show some options only to privileges users.
 59        """
 60        options = {
 61            "intro": {
 62                "type": UserInput.OPTION_INFO,
 63                "help": "Collects Bluesky posts via its API.\n\nYour login credentials will be sent to the 4CAT server "
 64                        "and stored there while data is fetched. After the dataset has been created your credentials "
 65                        "will be deleted from the server. \n[See tips and tricks on how to query Bluesky](https://bsky.social/about/blog/05-31-2024-search)."
 66            },
 67            "username": {
 68                "type": UserInput.OPTION_TEXT,
 69                "help": "Bluesky Username",
 70                "cache": True,
 71                "sensitive": True,
 72                "tooltip": "If no server is specified, .bsky.social is used."
 73            },
 74            "password": {
 75                "type": UserInput.OPTION_TEXT,
 76                "help": "Bluesky Password",
 77                "cache": True, # tells the frontend to cache this value
 78                "sensitive": True, # tells the backend to delete this value after use
 79                "password": True, # tells the frontend this is a password type
 80            },
 81            "divider": {
 82                "type": UserInput.OPTION_DIVIDER
 83            },
 84            "query": {
 85                "type": UserInput.OPTION_TEXT_LARGE,
 86                "help": "Search Queries",
 87                "tooltip": "Separate with commas or line breaks."
 88            },
 89            "max_posts": {
 90                "type": UserInput.OPTION_TEXT,
 91                "help": "Max posts per query",
 92                "min": 1,
 93                "default": 100
 94            },
 95            "daterange": {
 96                "type": UserInput.OPTION_DATERANGE,
 97                "help": "Date range",
 98                "tooltip": "The date range for the search. No date range will search all posts."
 99            },
100        }
101
102        # Update the max_posts setting from config
103        max_posts = int(config.get('bsky-search.max_results', default=100))
104        if max_posts == 0:
105            # This is potentially madness
106            options["max_posts"]["tooltip"] = "Set to 0 to collect all posts."
107            options['max_posts']['min'] = 0
108        else:
109            options["max_posts"]["max"] = max_posts
110            options['max_posts']['default'] = options['max_posts']['default'] if options['max_posts']['default'] <= max_posts else max_posts
111
112        return options
113
114    @staticmethod
115    def validate_query(query, request, config):
116        """
117        Validate Bluesky query
118
119        :param dict query:  Query parameters, from client-side.
120        :param request:  Flask request
121        :param User user:  User object of user who has submitted the query
122        :return dict:  Safe query parameters
123        """
124        # no query 4 u
125        if not query.get("query", "").strip():
126            raise QueryParametersException("You must provide a search query.")
127
128        if not query.get("username", None) or not query.get("password", None) :
129            raise QueryParametersException("You need to provide valid Bluesky login credentials first.")
130
131        # If no server is specified, default to .bsky.social
132        if "." not in query.get("username"):
133            query["username"] += ".bsky.social"
134        # Remove @ at the start
135        if query.get("username").startswith("@"):
136            query["username"] = query["username"][1:]
137
138        # Test login credentials
139        session_id = SearchBluesky.create_session_id(query["username"], query["password"])
140        try:
141            SearchBluesky.bsky_login(username=query["username"], password=query["password"], session_id=session_id, session_directory=config.get("PATH_ROOT").joinpath(config.get("PATH_SESSIONS")))
142        except UnauthorizedError:
143            raise QueryParametersException("Invalid Bluesky login credentials.")
144        except RequestException as e:
145            if e.response.content.message == 'Rate Limit Exceeded':
146                lifted_at = datetime.fromtimestamp(int(e.response.headers["ratelimit-reset"]))
147                raise QueryParametersException(f"Bluesky rate limit exceeded. Try again after {lifted_at.strftime('%Y-%m-%d %H:%M:%S')}.")
148            else:
149                raise QueryParametersException(f"Bluesky login failed. {e.response.content.message}")
150
151        # sanitize query
152        sanitized_query = [q.strip() for q in query.get("query").replace("\n", ",").split(",") if q.strip()]
153
154        # the dates need to make sense as a range to search within
155        min_date, max_date = query.get("daterange")
156        if min_date and max_date and min_date > max_date:
157            raise QueryParametersException("The start date must be before the end date.")
158
159        # Only check this if not already confirmed by the frontend
160        posts_per_second = 55 # gathered from simply checking start/end times of logs
161        if not query.get("frontend-confirm"):
162            # Estimate is not returned; use max_posts as a rough estimate
163            max_posts = query.get("max_posts", 100)
164            expected_tweets = query.get("max_posts", 100) * len(sanitized_query)
165            # Warn if process may take more than ~1 hours
166            if expected_tweets > (posts_per_second * 3600):
167                expected_time = timify(expected_tweets / posts_per_second)
168                raise QueryNeedsExplicitConfirmationException(f"This query matches approximately {expected_tweets} tweets and may take {expected_time} to complete. Do you want to continue?")
169            elif max_posts == 0 and not min_date:
170                raise QueryNeedsExplicitConfirmationException("No maximum number of posts set! This query has no minimum date and thus may take a very, very long time to complete. Do you want to continue?")
171            elif max_posts == 0:
172                raise QueryNeedsExplicitConfirmationException("No maximum number of posts set! This query may take a long time to complete. Do you want to continue?")
173
174        return {
175            "max_posts": query.get("max_posts"),
176            "query": ",".join(sanitized_query),
177            "username": query.get("username"),
178            "password": query.get("password"),
179            "session_id": session_id,
180            "min_date": min_date,
181            "max_date": max_date,
182        }
183
184    def get_items(self, query):
185        """
186        Execute a query; get messages for given parameters
187
188        Basically a wrapper around execute_queries() to call it with asyncio.
189
190        :param dict query:  Query parameters, as part of the DataSet object
191        :return list:  Posts, sorted by thread and post ID, in ascending order
192        """
193        if not query.get("session_id") and (not query.get("username") or not query.get("password")):
194            return self.dataset.finish_with_error("Your Bluesky login credentials are no longer available in 4CAT; please re-create this datasource.")
195
196        session_id = SearchBluesky.create_session_id(query.get("username"), query.get("password")) if not query.get("session_id") else query["session_id"]
197        try:
198            client = SearchBluesky.bsky_login(username=query.get("username"), password=query.get("password"), session_id=session_id, session_directory=self.config.get("PATH_ROOT").joinpath(self.config.get("PATH_SESSIONS")))
199        except (UnauthorizedError, RequestException, BadRequestError) as e:
200            self.dataset.log(f"Bluesky login failed: {e}")
201            return self.dataset.finish_with_error("Bluesky login failed; please re-create this datasource.")
202
203        self.dataset.update_status(f"Collecting posts from Bluesky as {client.me.handle}")
204
205        max_posts = query.get("max_posts", 100)
206        limit = 100 if (max_posts > 100 or max_posts == 0) else max_posts
207
208        # Handle reference mapping; user references use did instead of dynamic handle
209        did_to_handle = {}
210
211        query_parameters = {
212            "limit": limit,
213        }
214
215        # Add start and end dates if provided
216        if self.parameters.get("min_date"):
217            query_parameters['since'] = datetime.fromtimestamp(self.parameters.get("min_date")).strftime('%Y-%m-%dT%H:%M:%SZ')
218        if self.parameters.get("max_date"):
219            query_parameters['until'] = datetime.fromtimestamp(self.parameters.get("max_date")).strftime('%Y-%m-%dT%H:%M:%SZ')
220
221        queries = query.get("query").split(",")
222        num_queries = len(queries)
223        total_posts = 0
224        i = 0
225        last_query = None
226        last_date = None
227        while queries:
228            query = queries.pop(0)
229            if query == last_query:
230                # Check if there are continued posts from the last query
231                query_parameters['until'] = last_date.strftime('%Y-%m-%dT%H:%M:%SZ')
232                self.dataset.log(f"Continuing query ({i} of {num_queries}): {query} from {last_date.strftime('%Y-%m-%dT%H:%M:%SZ')}")
233            else:
234                # New query
235                query_post_ids = set()
236                i += 1
237                rank = 0
238                last_query = query
239                last_date = None
240                self.dataset.update_status(f"Collecting query ({i} of {num_queries}): {query}")
241                query_requests = 0
242
243            query_parameters["q"] = query
244            cursor = None  # Start with no cursor (first page)
245            search_for_invalid_post = False
246            invalid_post_counter = 0
247            while True:
248                if self.interrupted:
249                    raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API")
250                # Query posts, including pagination (cursor for next page)
251                tries = 0
252                response = None
253                while tries < 3:
254                    query_parameters["cursor"] = cursor
255                    try:
256                        response = client.app.bsky.feed.search_posts(params=query_parameters)
257                        break
258                    except ModelError as e:
259                        # Post validation error; one post is unable to be read
260                        # Pattern: some invalid post raises error, we switch from higher limit (usually 100) to 1 in
261                        # order to collect post by post, invalid post is identified again, we switch back to higher
262                        # limit and continue as normal, at the "end" of a cursor/query life (~10k posts) a NetworkError
263                        # is raised with detail refering to a server error 502 InternalServerError, we catch that and
264                        # add the query back to the queue with a new "until" date to continue the query
265                        # https://github.com/bluesky-social/atproto/issues/3446
266                        if not search_for_invalid_post:
267                            # New invalid post, search and skip
268                            self.dataset.log(f"Invalid post detected; searching post by post: {e}")
269                            search_for_invalid_post = True
270                            # Currently we must search post by post to find the invalid post
271                            query_parameters["limit"] = 1
272                        else:
273                            # Found invalid post, skip, reset counters
274                            self.dataset.log(
275                                f"Invalid post identified; skipping and continue with query as normal: {e}")
276                            search_for_invalid_post = False
277                            # Reset limit to normal
278                            query_parameters["limit"] = limit
279                            invalid_post_counter = 0
280                            cursor = str(int(cursor) + 1) if cursor else None
281                        # Re-query with new cursor & limit
282                        continue
283
284                    except InvokeTimeoutError as e:
285                        # Timeout error, but can occur for odd queries with no results
286                        self.dataset.log(f"Bluesky request error for query {query}: {e}")
287                        time.sleep(1)
288                        tries += 2
289                        continue
290                    except NetworkError as e:
291                        # 502 InternalServerError: occurs if switch limits in a "set" (i.e. the vague 10k posts cursor limit), I seem to get this error around the 10k mark instead of just a missing cursor as normal
292                        self.dataset.log(f"Bluesky network error for query {query}; retrying: {e}")
293                        time.sleep(1 + (tries * 10))
294                        queries.insert(0, query)
295                        break
296
297                if not response:
298                    # Expected from NetworkError, but the query will have been added back to the queue
299                    # If not, then there was a problem with the query
300                    if len(queries) == 0:
301                        self.dataset.update_status("Error collecting posts from Bluesky; see log for details", is_final=True)
302                    if query not in queries:
303                        # Query was not added back; there was an unexpected issue with the query itself
304                        self.dataset.update_status(f"Error continuing {query} from Bluesky (see log for details); continuing to next query")
305                    break
306
307                query_requests += 1
308                items = response['posts'] if hasattr(response, 'posts') else []
309
310                if search_for_invalid_post:
311                    invalid_post_counter += 1
312                    if invalid_post_counter >= 100:
313                        #  Max limit is 100; this should not occur, but we do not want to continue searching post by post indefinitely
314                        self.dataset.log("Unable to identify invalid post; discontinuing search")
315                        query_parameters["limit"] = limit
316                        search_for_invalid_post = False
317                        invalid_post_counter = 0
318
319                    if not items:
320                        # Sometimes no post is returned, but there still may be posts following
321                        self.dataset.log(f"Query {query} w/ params {query_parameters} returned no posts: {response}")
322                        # TODO: this is odd; no information is returned as to why that one item is not returned and no error is raised
323                        cursor = str(int(cursor) + 1) if cursor else None
324                        continue
325
326                new_posts = 0
327                # Handle the posts
328                for item in items:
329                    if 0 < max_posts <= rank:
330                        break
331
332                    if self.interrupted:
333                        raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API")
334
335                    post = item.model_dump()
336                    post_id = post["uri"]
337                    # Queries use the indexed_at date for time-based pagination (as opposed to created_at); used to continue query if needed
338                    last_date = SearchBluesky.bsky_convert_datetime_string(post.get("indexed_at"))
339                    if post_id in query_post_ids:
340                        # Skip duplicate posts
341                        continue
342
343                    new_posts += 1
344                    query_post_ids.add(post_id)
345
346                    # Add user handles from references
347                    did_to_handle[post["author"]["did"]] = post["author"]["handle"]
348                    # Mentions
349                    mentions = []
350                    if post["record"].get("facets"):
351                        for facet in post["record"]["facets"]:
352                            for feature in facet.get("features", {}):
353                                if feature.get("did"):
354                                    if feature["did"] in did_to_handle:
355                                        mentions.append({"did": feature["did"], "handle": did_to_handle[feature["did"]]})
356                                    else:
357                                        handle = SearchBluesky.bsky_get_handle_from_did(client, feature["did"])
358                                        if handle:
359                                            if handle.lower() in self.handle_lookup_error_messages:
360                                                self.dataset.log(f"Bluesky: user ({feature['did']}) {handle}")
361                                            mentions.append({"did": feature["did"], "handle": handle})
362                                            did_to_handle[feature["did"]] = handle
363                                        else:
364                                            mentions.append({"did": feature["did"], "handle": None})
365                                            self.dataset.log(f"Bluesky: could not lookup the handle for {feature['did']}")
366                    # Reply to
367                    reply_to_handle = None
368                    if post["record"].get("reply"):
369                        reply_to_did = post["record"]["reply"]["parent"]["uri"].split("/")[2]
370                        if reply_to_did in did_to_handle:
371                            reply_to_handle = did_to_handle[reply_to_did]
372                        else:
373                            handle = SearchBluesky.bsky_get_handle_from_did(client, reply_to_did)
374                            if handle:
375                                if handle.lower() in self.handle_lookup_error_messages:
376                                    self.dataset.log(f"Bluesky: user ({reply_to_did}) {handle}")
377                                reply_to_handle = handle
378                                did_to_handle[reply_to_did] = handle
379                            else:
380                                self.dataset.log(f"Bluesky: could not find handle for {reply_to_did}")
381
382
383                    post.update({"4CAT_metadata": {
384                        "collected_at": datetime.now().timestamp(),
385                        "query": query,
386                        "rank": rank,
387                        "mentions": mentions,
388                        "reply_to": reply_to_handle if reply_to_handle else None,
389                    }})
390                    rank += 1
391                    yield post
392                    total_posts += 1
393
394                # Check if there is a cursor for the next page
395                cursor = response['cursor']                
396                if max_posts != 0 and rank % (max_posts // 10) == 0:
397                    self.dataset.update_status(f"Progress query {query}: {rank} posts collected out of {max_posts}")
398                    self.dataset.update_progress(total_posts / (max_posts * num_queries))
399                elif max_posts == 0 and rank % 1000 == 0:
400                    self.dataset.update_status(f"Progress query {query}: {rank} posts collected")
401
402                if 0 < max_posts <= rank:
403                    self.dataset.update_status(
404                        f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}")
405                    break
406
407                if not cursor:
408                    if new_posts:
409                        # Bluesky API seems to stop around 10000 posts and not return a cursor
410                        # Re-query with the same query to get the next set of posts using last_date (set above)
411                        self.dataset.log(f"Query {query}: {query_requests} requests")
412                        queries.insert(0, query)
413                    else:
414                        # No new posts; if we have not hit the max_posts, but no new posts are being returned, then we are done
415                        self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned")
416
417                    if rank:
418                        self.dataset.update_status(f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}")
419                    break  # No more pages, stop the loop
420                elif not items:
421                    self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned")
422                    break
423
424    @staticmethod
425    def map_item(item):
426        """
427        Convert item object to 4CAT-ready data object
428
429        :param dict item:  item to parse
430        :return dict:  4CAT-compatible item object
431        """
432        unmapped_data = []
433
434        # Add link to post; this is a Bluesky-specific URL and may not always be accurate
435        link = SearchBluesky.get_bsky_link(item['author']['handle'], item['uri'].split('/')[-1])
436        author_profile = f"https://bsky.app/profile/{item['author']['handle']}"
437
438        created_at = SearchBluesky.bsky_convert_datetime_string(item["record"].get("created_at",item["record"].get("createdAt")))
439
440        # Tags
441        tags = set()
442        links = set()
443        mentions_did = set()
444        has_poll = False
445        if item["record"].get("facets"):
446            for facet in item["record"].get("facets"):
447                for feature in facet.get("features"):
448                    if feature.get("tag"):
449                        tags.add(feature.get("tag"))
450                    elif feature.get("uri"):
451                        links.add(feature.get("uri"))
452                    elif feature.get("did"):
453                        mentions_did.add(feature.get("did"))
454                    elif feature.get("number"):
455                        has_poll = True
456                    else:
457                        unmapped_data.append({"loc": "record.facets.features", "obj": feature})
458                if "features" not in facet:
459                    unmapped_data.append({"loc": "record.facets", "obj": facet})
460
461        # Embeds are in both the item and the record; so far these always contain same item
462        embeded_links = set()
463        embeded_images = set()
464        image_references = set()
465        quoted_link = None
466        quoted_user = None
467        quoted_ref = None
468        possible_embeds = [item.get("embed", {}), item["record"].get("embed", {})]
469        while possible_embeds:
470            embed = possible_embeds.pop(0)
471            if not embed:
472                continue
473
474            py_type = embed.pop("py_type") if "py_type" in embed else (embed.pop("$type") if "$type" in embed else None)
475            if py_type in ["app.bsky.embed.recordWithMedia#view", "app.bsky.embed.recordWithMedia"]:
476                # contains post plus additional media
477                for key, media_ob in embed.items():
478                    possible_embeds.append(media_ob)
479
480            elif "images" in embed: # py_type in ["app.bsky.embed.images#view", "app.bsky.embed.images", "app.bsky.embed.images#main"]
481                for img_ob in embed["images"]:
482                    img_link = img_ob.get("fullsize", img_ob.get("thumb"))
483                    if img_link:
484                        embeded_images.add(img_link)
485                    elif img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")):
486                        # ob.get("image").get("ref").get("link") will have a reference that could be linked via API
487                        # BUT ref has already been obtained in other embeds...
488                        image_references.add(img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")))
489                    else:
490                        unmapped_data.append({"loc": "embed.images", "obj": img_ob})
491            elif py_type in ["app.bsky.embed.video#view", "app.bsky.embed.video"]:
492                # Does not appear to be direct video links, just thumbnail (Bluesky API may be able to get more)
493                if embed.get("thumbnail"):
494                    embeded_images.add(embed.get("thumbnail"))
495                elif embed.get("video", {}).get("ref", {}).get("link"):
496                    image_references.add(embed.get("video", {}).get("ref", {}).get("link"))
497                else:
498                    # No thumb for video
499                    pass
500            elif "record" in embed: # py_type in ["app.bsky.embed.record#view", "app.bsky.embed.record"]
501                # Quoted post
502                # Note: these may also contain images that would be seen, but are not part of the original post
503                if embed["record"].get("author", embed["record"].get("creator")):
504                    if "handle" not in embed["record"].get("author", embed["record"].get("creator")):
505                        # User may not be able to see original post
506                        if "app.bsky.feed.defs#blockedAuthor" == embed["record"].get("author", embed["record"].get("creator"))["py_type"]:
507                            quoted_link = "VIEWER BLOCKED BY AUTHOR"
508                        else:
509                            # New unknown
510                            unmapped_data.append({"loc": "embed.record.author", "obj": embed["record"].get("author", embed["record"].get("creator"))})
511                    else:
512                        # author seems to be a quoted post while creator a quoted "list"
513                        quoted_user = embed["record"].get("author", embed["record"].get("creator"))["handle"]
514                        quoted_link = SearchBluesky.get_bsky_link(quoted_user, embed['record']['uri'].split('/')[-1])
515                elif embed["record"].get("not_found"):
516                    quoted_link = "DELETED"
517                    # We do have the DID, but this information is not normally displayed
518                    # quoted_user = embed["record"]['uri'].split("/")[2]
519                elif embed["record"].get("detached"):
520                    quoted_link = "REMOVED BY AUTHOR"
521                else:
522                    quoted_ref = embed["record"]['uri']
523            elif "external" in embed: # py_type in ["app.bsky.embed.external#view", "app.bsky.embed.external"]
524                if embed["external"].get("uri"):
525                    embeded_links.add(embed["external"].get("uri"))
526                if embed["external"].get("thumb"):
527                    if isinstance(embed["external"]["thumb"], str):
528                        embeded_images.add(embed["external"]["thumb"])
529                    else:
530                        image_references.add(embed["external"]["thumb"].get("ref", {}).get("link", ""))
531                else:
532                    # No thumb for link
533                    pass
534            else:
535                unmapped_data.append({"loc": f"embed.{py_type}",
536                                      "obj": embed})
537
538        # Replies allowed
539        # https://docs.bsky.app/docs/tutorials/thread-gates
540        # threadgate object does not appear to differentiate between the types of replies allowed
541        replies_allowed = True if not item["threadgate"] else False
542
543        # Labels (content moderation)
544        labels = set() if not item["labels"] else set([label.get("val") for label in item["labels"]])
545        if item["record"].get("labels"):
546            labels = labels | set([label.get("val") for label in item["record"]["labels"].get("values",[])])
547
548        # Language
549        languages = "N/A" if not item["record"].get("langs") else ",".join(item["record"].get("langs"))
550
551        # Missing references
552        if any([ref for ref in image_references if ref not in "".join(embeded_images)]):
553            unmapped_data.append({"loc": "missing_image_refs", "obj": [ref for ref in image_references if ref not in "".join(embeded_images)]})
554        if quoted_ref:
555            if not quoted_link or (quoted_link not in ["DELETED", "REMOVED BY AUTHOR", "VIEWER BLOCKED BY AUTHOR"] and quoted_ref.split('/')[-1] not in quoted_link):
556                unmapped_data.append({"loc": "missing_quote_ref", "obj": quoted_ref})
557
558        # Reference Posts (expanded to include handles during collection)
559        # None: handles may change; true DID from original object stored item["record"]["facets"]["features"] w/ "did"
560        # Mentions
561        mentions = [(mention.get("handle") if (mention.get("handle") and mention["handle"].lower() not in SearchBluesky.handle_lookup_error_messages) else mention.get("did")) for mention in item.get("4CAT_metadata", {}).get("mentions", [])]
562        # Reply to
563        replied_to_post = None
564        replied_to_user = None
565        if item["record"].get("reply"):
566            if item["4CAT_metadata"]["reply_to"] and item["4CAT_metadata"]["reply_to"].lower() not in SearchBluesky.handle_lookup_error_messages:
567                replied_to_user = item["4CAT_metadata"]["reply_to"]
568            else:
569                # Use DID, though this will not create a working link
570                replied_to_user = item["record"]["reply"]["parent"]["uri"].split("/")[2]
571            replied_to_post = SearchBluesky.get_bsky_link(replied_to_user, item["record"]["reply"]["parent"]["uri"].split("/")[-1])
572
573        # These refer to slices of the text, but are also contained in the text or as an embed. If they are NOT also in the text and/or embed fields, then they are NOT displayed in bsky.app UI and thus only metadata
574        # if item["record"].get("entities"):
575        #     unmapped_data.append({"loc": "record.entities", "obj": item["record"]["entities"]})
576
577        # Author tags, not hashtags, not seen, very rarely used
578        # if item["record"].get("tags"):
579        #     unmapped_data.append({"loc": "record.tags", "obj": item["record"].get("tags")})
580
581        return MappedItem({
582            "collected_at": datetime.fromtimestamp(item["4CAT_metadata"]["collected_at"]).isoformat(),
583            "query": item["4CAT_metadata"]["query"],
584            "rank": item["4CAT_metadata"]["rank"],
585            "id": item["uri"],
586            "thread_id": item["record"]["reply"]["root"]["uri"] if item["record"].get("reply") else item["uri"],
587            "created_at": created_at.isoformat(),
588            "author": item["author"]["handle"],
589            "author_id": item["author"]["did"],
590            "body": item["record"]["text"],
591            "link": link,
592            "tags": ",".join(tags),
593            "like_count": item["like_count"],
594            "quote_count": item["quote_count"],
595            "reply_count": item["reply_count"],
596            "repost_count": item["repost_count"],
597            "quoted_post": quoted_link if quoted_link else "",
598            "quoted_user": quoted_user if quoted_user else "",
599            "replied_to_post": replied_to_post if replied_to_post else "",
600            "replied_to_user": replied_to_user if replied_to_user else "",
601            "replies_allowed": replies_allowed,
602            "mentions": ",".join(mentions),
603            "links": ",".join(embeded_links | links),
604            "images": ",".join(embeded_images),
605            "labels": ",".join(labels),
606            "has_poll": has_poll,
607            "languages": languages,
608
609            "author_display_name": item["author"]["display_name"],
610            "author_profile": author_profile,
611            "author_avatar": item["author"]["avatar"],
612            "author_created_at": SearchBluesky.bsky_convert_datetime_string(item["author"]["created_at"], mode="iso_string", raise_error=False),
613
614            "timestamp": int(created_at.timestamp()),
615        }, message=f"Bluesky new mappings: {unmapped_data}")
616
617    @staticmethod
618    def bsky_convert_datetime_string(datetime_string, mode="datetime", raise_error=True):
619        """
620        Bluesky datetime string to datetime object.
621
622        Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string.
623
624        :param str datetime_string:  The datetime string to convert
625        :param str mode:  The mode to return the datetime object in [datetime, iso_string]
626        :param bool raise_error:  Raise error if unable to parse else return datetime_string
627        :return datetime/str:  The converted datetime object
628        """
629        try:
630            datetime_object = parser.isoparse(datetime_string)
631        except ValueError as e:
632            if raise_error:
633                raise e
634            return datetime_string
635        
636        if mode == "datetime":
637            return datetime_object
638        elif mode == "iso_string":
639            return datetime_object.isoformat()
640
641
642    @staticmethod
643    def get_bsky_link(handle, post_id):
644        """
645        Get link to Bluesky post
646        """
647        return f"https://bsky.app/profile/{handle}/post/{post_id}"
648
649    @staticmethod
650    def bsky_get_handle_from_did(client, did):
651        """
652        Get handle from DID
653        """
654        tries = 0
655        while True:
656            try:
657                user_profile = client.app.bsky.actor.get_profile({"actor": did})
658                if user_profile:
659                    return user_profile.handle
660                else:
661                    return None
662            except (NetworkError, InvokeTimeoutError):
663                # Network error; try again
664                tries += 1
665                time.sleep(1)
666                if tries > 3:
667                    return None
668                continue
669            except BadRequestError as e:
670                if e.response.content.message:
671                    return e.response.content.message
672                return None
673
674    @staticmethod
675    def bsky_login(username, password, session_id, session_directory):
676        """
677        Login to Bluesky
678
679        :param str username:  Username for Bluesky
680        :param str password:  Password for Bluesky
681        :param str session_id:  Session ID to use for login
682        :param Path session_directory:  Directory to save the session file
683        :return Client:  Client object with login credentials
684        """
685        if not session_id:
686            session_id = SearchBluesky.create_session_id(username, password)
687        elif (not username or not password) and not session_id:
688            raise ValueError("Must provide both username and password or else session_id.")
689
690        session_path = session_directory.joinpath("bsky_" + session_id + ".session")
691
692        def on_session_change(event: SessionEvent, session: Session) -> None:
693            """
694            Save session to file; atproto session change event handler should keep the session up to date
695
696            https://atproto.blue/en/latest/atproto_client/auth.html
697            """
698            print('Session changed:', event, repr(session))
699            if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
700                print('Saving changed session')
701                with session_path.open("w") as session_file:
702                    session_file.write(session.export())
703
704        client = Client()
705        client.on_session_change(on_session_change)
706        if session_path.exists():
707            with session_path.open() as session_file:
708                session_string = session_file.read()
709                try:
710                    client.login(session_string=session_string)
711                except BadRequestError as e:
712                    if e.response.content.message == 'Token has expired':
713                        # Token has expired; try to refresh
714                        if username and password:
715                            client.login(login=username, password=password)
716                        else:
717                            raise ValueError("Session token has expired; please re-login with username and password.")
718        else:
719            # Were not able to log in via session string; login with username and password
720            client.login(login=username, password=password)
721        return client
722
723    @staticmethod
724    def create_session_id(username, password):
725        """
726        Generate a filename for the session file
727
728        This is a combination of username and password, but hashed
729        so that one cannot actually derive someone's information.
730
731        :param str username:  Username for Bluesky
732        :param str password:  Password for Bluesky
733        :return str: A hash value derived from the input
734        """
735        hash_base = username.strip() + str(password).strip()
736        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
class SearchBluesky(backend.lib.search.Search):
 22class SearchBluesky(Search):
 23    """
 24    Search for posts in Bluesky
 25    """
 26    type = "bsky-search"  # job ID
 27    category = "Search"  # category
 28    title = "Bluesky Search"  # title displayed in UI
 29    description = "Collects Bluesky posts via its API."  # description displayed in UI
 30    extension = "ndjson"  # extension of result file, used internally and in UI
 31    is_local = False  # Whether this datasource is locally scraped
 32    is_static = False  # Whether this datasource is still updated
 33
 34    config = {
 35        "bsky-search.max_results": {
 36            "type": UserInput.OPTION_TEXT,
 37            "help": "Maximum results per query",
 38            "coerce_type": int,
 39            "min": 0,
 40            "default": 50000,
 41            "tooltip": "Amount of results (e.g., posts) per query. '0' will allow unlimited."
 42        }
 43    }
 44
 45    handle_lookup_error_messages = ['account is deactivated', "profile not found", "account has been suspended"]
 46
 47    @classmethod
 48    def get_options(cls, parent_dataset=None, config=None):
 49        """
 50        Get processor options
 51
 52        Just updates the description of the entities field based on the
 53        configured max entities.
 54
 55        :param DataSet parent_dataset:  An object representing the dataset that
 56          the processor would be run on
 57        :param User user:  Flask user the options will be displayed for, in
 58          case they are requested for display in the 4CAT web interface. This can
 59          be used to show some options only to privileges users.
 60        """
 61        options = {
 62            "intro": {
 63                "type": UserInput.OPTION_INFO,
 64                "help": "Collects Bluesky posts via its API.\n\nYour login credentials will be sent to the 4CAT server "
 65                        "and stored there while data is fetched. After the dataset has been created your credentials "
 66                        "will be deleted from the server. \n[See tips and tricks on how to query Bluesky](https://bsky.social/about/blog/05-31-2024-search)."
 67            },
 68            "username": {
 69                "type": UserInput.OPTION_TEXT,
 70                "help": "Bluesky Username",
 71                "cache": True,
 72                "sensitive": True,
 73                "tooltip": "If no server is specified, .bsky.social is used."
 74            },
 75            "password": {
 76                "type": UserInput.OPTION_TEXT,
 77                "help": "Bluesky Password",
 78                "cache": True, # tells the frontend to cache this value
 79                "sensitive": True, # tells the backend to delete this value after use
 80                "password": True, # tells the frontend this is a password type
 81            },
 82            "divider": {
 83                "type": UserInput.OPTION_DIVIDER
 84            },
 85            "query": {
 86                "type": UserInput.OPTION_TEXT_LARGE,
 87                "help": "Search Queries",
 88                "tooltip": "Separate with commas or line breaks."
 89            },
 90            "max_posts": {
 91                "type": UserInput.OPTION_TEXT,
 92                "help": "Max posts per query",
 93                "min": 1,
 94                "default": 100
 95            },
 96            "daterange": {
 97                "type": UserInput.OPTION_DATERANGE,
 98                "help": "Date range",
 99                "tooltip": "The date range for the search. No date range will search all posts."
100            },
101        }
102
103        # Update the max_posts setting from config
104        max_posts = int(config.get('bsky-search.max_results', default=100))
105        if max_posts == 0:
106            # This is potentially madness
107            options["max_posts"]["tooltip"] = "Set to 0 to collect all posts."
108            options['max_posts']['min'] = 0
109        else:
110            options["max_posts"]["max"] = max_posts
111            options['max_posts']['default'] = options['max_posts']['default'] if options['max_posts']['default'] <= max_posts else max_posts
112
113        return options
114
115    @staticmethod
116    def validate_query(query, request, config):
117        """
118        Validate Bluesky query
119
120        :param dict query:  Query parameters, from client-side.
121        :param request:  Flask request
122        :param User user:  User object of user who has submitted the query
123        :return dict:  Safe query parameters
124        """
125        # no query 4 u
126        if not query.get("query", "").strip():
127            raise QueryParametersException("You must provide a search query.")
128
129        if not query.get("username", None) or not query.get("password", None) :
130            raise QueryParametersException("You need to provide valid Bluesky login credentials first.")
131
132        # If no server is specified, default to .bsky.social
133        if "." not in query.get("username"):
134            query["username"] += ".bsky.social"
135        # Remove @ at the start
136        if query.get("username").startswith("@"):
137            query["username"] = query["username"][1:]
138
139        # Test login credentials
140        session_id = SearchBluesky.create_session_id(query["username"], query["password"])
141        try:
142            SearchBluesky.bsky_login(username=query["username"], password=query["password"], session_id=session_id, session_directory=config.get("PATH_ROOT").joinpath(config.get("PATH_SESSIONS")))
143        except UnauthorizedError:
144            raise QueryParametersException("Invalid Bluesky login credentials.")
145        except RequestException as e:
146            if e.response.content.message == 'Rate Limit Exceeded':
147                lifted_at = datetime.fromtimestamp(int(e.response.headers["ratelimit-reset"]))
148                raise QueryParametersException(f"Bluesky rate limit exceeded. Try again after {lifted_at.strftime('%Y-%m-%d %H:%M:%S')}.")
149            else:
150                raise QueryParametersException(f"Bluesky login failed. {e.response.content.message}")
151
152        # sanitize query
153        sanitized_query = [q.strip() for q in query.get("query").replace("\n", ",").split(",") if q.strip()]
154
155        # the dates need to make sense as a range to search within
156        min_date, max_date = query.get("daterange")
157        if min_date and max_date and min_date > max_date:
158            raise QueryParametersException("The start date must be before the end date.")
159
160        # Only check this if not already confirmed by the frontend
161        posts_per_second = 55 # gathered from simply checking start/end times of logs
162        if not query.get("frontend-confirm"):
163            # Estimate is not returned; use max_posts as a rough estimate
164            max_posts = query.get("max_posts", 100)
165            expected_tweets = query.get("max_posts", 100) * len(sanitized_query)
166            # Warn if process may take more than ~1 hours
167            if expected_tweets > (posts_per_second * 3600):
168                expected_time = timify(expected_tweets / posts_per_second)
169                raise QueryNeedsExplicitConfirmationException(f"This query matches approximately {expected_tweets} tweets and may take {expected_time} to complete. Do you want to continue?")
170            elif max_posts == 0 and not min_date:
171                raise QueryNeedsExplicitConfirmationException("No maximum number of posts set! This query has no minimum date and thus may take a very, very long time to complete. Do you want to continue?")
172            elif max_posts == 0:
173                raise QueryNeedsExplicitConfirmationException("No maximum number of posts set! This query may take a long time to complete. Do you want to continue?")
174
175        return {
176            "max_posts": query.get("max_posts"),
177            "query": ",".join(sanitized_query),
178            "username": query.get("username"),
179            "password": query.get("password"),
180            "session_id": session_id,
181            "min_date": min_date,
182            "max_date": max_date,
183        }
184
185    def get_items(self, query):
186        """
187        Execute a query; get messages for given parameters
188
189        Basically a wrapper around execute_queries() to call it with asyncio.
190
191        :param dict query:  Query parameters, as part of the DataSet object
192        :return list:  Posts, sorted by thread and post ID, in ascending order
193        """
194        if not query.get("session_id") and (not query.get("username") or not query.get("password")):
195            return self.dataset.finish_with_error("Your Bluesky login credentials are no longer available in 4CAT; please re-create this datasource.")
196
197        session_id = SearchBluesky.create_session_id(query.get("username"), query.get("password")) if not query.get("session_id") else query["session_id"]
198        try:
199            client = SearchBluesky.bsky_login(username=query.get("username"), password=query.get("password"), session_id=session_id, session_directory=self.config.get("PATH_ROOT").joinpath(self.config.get("PATH_SESSIONS")))
200        except (UnauthorizedError, RequestException, BadRequestError) as e:
201            self.dataset.log(f"Bluesky login failed: {e}")
202            return self.dataset.finish_with_error("Bluesky login failed; please re-create this datasource.")
203
204        self.dataset.update_status(f"Collecting posts from Bluesky as {client.me.handle}")
205
206        max_posts = query.get("max_posts", 100)
207        limit = 100 if (max_posts > 100 or max_posts == 0) else max_posts
208
209        # Handle reference mapping; user references use did instead of dynamic handle
210        did_to_handle = {}
211
212        query_parameters = {
213            "limit": limit,
214        }
215
216        # Add start and end dates if provided
217        if self.parameters.get("min_date"):
218            query_parameters['since'] = datetime.fromtimestamp(self.parameters.get("min_date")).strftime('%Y-%m-%dT%H:%M:%SZ')
219        if self.parameters.get("max_date"):
220            query_parameters['until'] = datetime.fromtimestamp(self.parameters.get("max_date")).strftime('%Y-%m-%dT%H:%M:%SZ')
221
222        queries = query.get("query").split(",")
223        num_queries = len(queries)
224        total_posts = 0
225        i = 0
226        last_query = None
227        last_date = None
228        while queries:
229            query = queries.pop(0)
230            if query == last_query:
231                # Check if there are continued posts from the last query
232                query_parameters['until'] = last_date.strftime('%Y-%m-%dT%H:%M:%SZ')
233                self.dataset.log(f"Continuing query ({i} of {num_queries}): {query} from {last_date.strftime('%Y-%m-%dT%H:%M:%SZ')}")
234            else:
235                # New query
236                query_post_ids = set()
237                i += 1
238                rank = 0
239                last_query = query
240                last_date = None
241                self.dataset.update_status(f"Collecting query ({i} of {num_queries}): {query}")
242                query_requests = 0
243
244            query_parameters["q"] = query
245            cursor = None  # Start with no cursor (first page)
246            search_for_invalid_post = False
247            invalid_post_counter = 0
248            while True:
249                if self.interrupted:
250                    raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API")
251                # Query posts, including pagination (cursor for next page)
252                tries = 0
253                response = None
254                while tries < 3:
255                    query_parameters["cursor"] = cursor
256                    try:
257                        response = client.app.bsky.feed.search_posts(params=query_parameters)
258                        break
259                    except ModelError as e:
260                        # Post validation error; one post is unable to be read
261                        # Pattern: some invalid post raises error, we switch from higher limit (usually 100) to 1 in
262                        # order to collect post by post, invalid post is identified again, we switch back to higher
263                        # limit and continue as normal, at the "end" of a cursor/query life (~10k posts) a NetworkError
264                        # is raised with detail refering to a server error 502 InternalServerError, we catch that and
265                        # add the query back to the queue with a new "until" date to continue the query
266                        # https://github.com/bluesky-social/atproto/issues/3446
267                        if not search_for_invalid_post:
268                            # New invalid post, search and skip
269                            self.dataset.log(f"Invalid post detected; searching post by post: {e}")
270                            search_for_invalid_post = True
271                            # Currently we must search post by post to find the invalid post
272                            query_parameters["limit"] = 1
273                        else:
274                            # Found invalid post, skip, reset counters
275                            self.dataset.log(
276                                f"Invalid post identified; skipping and continue with query as normal: {e}")
277                            search_for_invalid_post = False
278                            # Reset limit to normal
279                            query_parameters["limit"] = limit
280                            invalid_post_counter = 0
281                            cursor = str(int(cursor) + 1) if cursor else None
282                        # Re-query with new cursor & limit
283                        continue
284
285                    except InvokeTimeoutError as e:
286                        # Timeout error, but can occur for odd queries with no results
287                        self.dataset.log(f"Bluesky request error for query {query}: {e}")
288                        time.sleep(1)
289                        tries += 2
290                        continue
291                    except NetworkError as e:
292                        # 502 InternalServerError: occurs if switch limits in a "set" (i.e. the vague 10k posts cursor limit), I seem to get this error around the 10k mark instead of just a missing cursor as normal
293                        self.dataset.log(f"Bluesky network error for query {query}; retrying: {e}")
294                        time.sleep(1 + (tries * 10))
295                        queries.insert(0, query)
296                        break
297
298                if not response:
299                    # Expected from NetworkError, but the query will have been added back to the queue
300                    # If not, then there was a problem with the query
301                    if len(queries) == 0:
302                        self.dataset.update_status("Error collecting posts from Bluesky; see log for details", is_final=True)
303                    if query not in queries:
304                        # Query was not added back; there was an unexpected issue with the query itself
305                        self.dataset.update_status(f"Error continuing {query} from Bluesky (see log for details); continuing to next query")
306                    break
307
308                query_requests += 1
309                items = response['posts'] if hasattr(response, 'posts') else []
310
311                if search_for_invalid_post:
312                    invalid_post_counter += 1
313                    if invalid_post_counter >= 100:
314                        #  Max limit is 100; this should not occur, but we do not want to continue searching post by post indefinitely
315                        self.dataset.log("Unable to identify invalid post; discontinuing search")
316                        query_parameters["limit"] = limit
317                        search_for_invalid_post = False
318                        invalid_post_counter = 0
319
320                    if not items:
321                        # Sometimes no post is returned, but there still may be posts following
322                        self.dataset.log(f"Query {query} w/ params {query_parameters} returned no posts: {response}")
323                        # TODO: this is odd; no information is returned as to why that one item is not returned and no error is raised
324                        cursor = str(int(cursor) + 1) if cursor else None
325                        continue
326
327                new_posts = 0
328                # Handle the posts
329                for item in items:
330                    if 0 < max_posts <= rank:
331                        break
332
333                    if self.interrupted:
334                        raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API")
335
336                    post = item.model_dump()
337                    post_id = post["uri"]
338                    # Queries use the indexed_at date for time-based pagination (as opposed to created_at); used to continue query if needed
339                    last_date = SearchBluesky.bsky_convert_datetime_string(post.get("indexed_at"))
340                    if post_id in query_post_ids:
341                        # Skip duplicate posts
342                        continue
343
344                    new_posts += 1
345                    query_post_ids.add(post_id)
346
347                    # Add user handles from references
348                    did_to_handle[post["author"]["did"]] = post["author"]["handle"]
349                    # Mentions
350                    mentions = []
351                    if post["record"].get("facets"):
352                        for facet in post["record"]["facets"]:
353                            for feature in facet.get("features", {}):
354                                if feature.get("did"):
355                                    if feature["did"] in did_to_handle:
356                                        mentions.append({"did": feature["did"], "handle": did_to_handle[feature["did"]]})
357                                    else:
358                                        handle = SearchBluesky.bsky_get_handle_from_did(client, feature["did"])
359                                        if handle:
360                                            if handle.lower() in self.handle_lookup_error_messages:
361                                                self.dataset.log(f"Bluesky: user ({feature['did']}) {handle}")
362                                            mentions.append({"did": feature["did"], "handle": handle})
363                                            did_to_handle[feature["did"]] = handle
364                                        else:
365                                            mentions.append({"did": feature["did"], "handle": None})
366                                            self.dataset.log(f"Bluesky: could not lookup the handle for {feature['did']}")
367                    # Reply to
368                    reply_to_handle = None
369                    if post["record"].get("reply"):
370                        reply_to_did = post["record"]["reply"]["parent"]["uri"].split("/")[2]
371                        if reply_to_did in did_to_handle:
372                            reply_to_handle = did_to_handle[reply_to_did]
373                        else:
374                            handle = SearchBluesky.bsky_get_handle_from_did(client, reply_to_did)
375                            if handle:
376                                if handle.lower() in self.handle_lookup_error_messages:
377                                    self.dataset.log(f"Bluesky: user ({reply_to_did}) {handle}")
378                                reply_to_handle = handle
379                                did_to_handle[reply_to_did] = handle
380                            else:
381                                self.dataset.log(f"Bluesky: could not find handle for {reply_to_did}")
382
383
384                    post.update({"4CAT_metadata": {
385                        "collected_at": datetime.now().timestamp(),
386                        "query": query,
387                        "rank": rank,
388                        "mentions": mentions,
389                        "reply_to": reply_to_handle if reply_to_handle else None,
390                    }})
391                    rank += 1
392                    yield post
393                    total_posts += 1
394
395                # Check if there is a cursor for the next page
396                cursor = response['cursor']                
397                if max_posts != 0 and rank % (max_posts // 10) == 0:
398                    self.dataset.update_status(f"Progress query {query}: {rank} posts collected out of {max_posts}")
399                    self.dataset.update_progress(total_posts / (max_posts * num_queries))
400                elif max_posts == 0 and rank % 1000 == 0:
401                    self.dataset.update_status(f"Progress query {query}: {rank} posts collected")
402
403                if 0 < max_posts <= rank:
404                    self.dataset.update_status(
405                        f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}")
406                    break
407
408                if not cursor:
409                    if new_posts:
410                        # Bluesky API seems to stop around 10000 posts and not return a cursor
411                        # Re-query with the same query to get the next set of posts using last_date (set above)
412                        self.dataset.log(f"Query {query}: {query_requests} requests")
413                        queries.insert(0, query)
414                    else:
415                        # No new posts; if we have not hit the max_posts, but no new posts are being returned, then we are done
416                        self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned")
417
418                    if rank:
419                        self.dataset.update_status(f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}")
420                    break  # No more pages, stop the loop
421                elif not items:
422                    self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned")
423                    break
424
425    @staticmethod
426    def map_item(item):
427        """
428        Convert item object to 4CAT-ready data object
429
430        :param dict item:  item to parse
431        :return dict:  4CAT-compatible item object
432        """
433        unmapped_data = []
434
435        # Add link to post; this is a Bluesky-specific URL and may not always be accurate
436        link = SearchBluesky.get_bsky_link(item['author']['handle'], item['uri'].split('/')[-1])
437        author_profile = f"https://bsky.app/profile/{item['author']['handle']}"
438
439        created_at = SearchBluesky.bsky_convert_datetime_string(item["record"].get("created_at",item["record"].get("createdAt")))
440
441        # Tags
442        tags = set()
443        links = set()
444        mentions_did = set()
445        has_poll = False
446        if item["record"].get("facets"):
447            for facet in item["record"].get("facets"):
448                for feature in facet.get("features"):
449                    if feature.get("tag"):
450                        tags.add(feature.get("tag"))
451                    elif feature.get("uri"):
452                        links.add(feature.get("uri"))
453                    elif feature.get("did"):
454                        mentions_did.add(feature.get("did"))
455                    elif feature.get("number"):
456                        has_poll = True
457                    else:
458                        unmapped_data.append({"loc": "record.facets.features", "obj": feature})
459                if "features" not in facet:
460                    unmapped_data.append({"loc": "record.facets", "obj": facet})
461
462        # Embeds are in both the item and the record; so far these always contain same item
463        embeded_links = set()
464        embeded_images = set()
465        image_references = set()
466        quoted_link = None
467        quoted_user = None
468        quoted_ref = None
469        possible_embeds = [item.get("embed", {}), item["record"].get("embed", {})]
470        while possible_embeds:
471            embed = possible_embeds.pop(0)
472            if not embed:
473                continue
474
475            py_type = embed.pop("py_type") if "py_type" in embed else (embed.pop("$type") if "$type" in embed else None)
476            if py_type in ["app.bsky.embed.recordWithMedia#view", "app.bsky.embed.recordWithMedia"]:
477                # contains post plus additional media
478                for key, media_ob in embed.items():
479                    possible_embeds.append(media_ob)
480
481            elif "images" in embed: # py_type in ["app.bsky.embed.images#view", "app.bsky.embed.images", "app.bsky.embed.images#main"]
482                for img_ob in embed["images"]:
483                    img_link = img_ob.get("fullsize", img_ob.get("thumb"))
484                    if img_link:
485                        embeded_images.add(img_link)
486                    elif img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")):
487                        # ob.get("image").get("ref").get("link") will have a reference that could be linked via API
488                        # BUT ref has already been obtained in other embeds...
489                        image_references.add(img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")))
490                    else:
491                        unmapped_data.append({"loc": "embed.images", "obj": img_ob})
492            elif py_type in ["app.bsky.embed.video#view", "app.bsky.embed.video"]:
493                # Does not appear to be direct video links, just thumbnail (Bluesky API may be able to get more)
494                if embed.get("thumbnail"):
495                    embeded_images.add(embed.get("thumbnail"))
496                elif embed.get("video", {}).get("ref", {}).get("link"):
497                    image_references.add(embed.get("video", {}).get("ref", {}).get("link"))
498                else:
499                    # No thumb for video
500                    pass
501            elif "record" in embed: # py_type in ["app.bsky.embed.record#view", "app.bsky.embed.record"]
502                # Quoted post
503                # Note: these may also contain images that would be seen, but are not part of the original post
504                if embed["record"].get("author", embed["record"].get("creator")):
505                    if "handle" not in embed["record"].get("author", embed["record"].get("creator")):
506                        # User may not be able to see original post
507                        if "app.bsky.feed.defs#blockedAuthor" == embed["record"].get("author", embed["record"].get("creator"))["py_type"]:
508                            quoted_link = "VIEWER BLOCKED BY AUTHOR"
509                        else:
510                            # New unknown
511                            unmapped_data.append({"loc": "embed.record.author", "obj": embed["record"].get("author", embed["record"].get("creator"))})
512                    else:
513                        # author seems to be a quoted post while creator a quoted "list"
514                        quoted_user = embed["record"].get("author", embed["record"].get("creator"))["handle"]
515                        quoted_link = SearchBluesky.get_bsky_link(quoted_user, embed['record']['uri'].split('/')[-1])
516                elif embed["record"].get("not_found"):
517                    quoted_link = "DELETED"
518                    # We do have the DID, but this information is not normally displayed
519                    # quoted_user = embed["record"]['uri'].split("/")[2]
520                elif embed["record"].get("detached"):
521                    quoted_link = "REMOVED BY AUTHOR"
522                else:
523                    quoted_ref = embed["record"]['uri']
524            elif "external" in embed: # py_type in ["app.bsky.embed.external#view", "app.bsky.embed.external"]
525                if embed["external"].get("uri"):
526                    embeded_links.add(embed["external"].get("uri"))
527                if embed["external"].get("thumb"):
528                    if isinstance(embed["external"]["thumb"], str):
529                        embeded_images.add(embed["external"]["thumb"])
530                    else:
531                        image_references.add(embed["external"]["thumb"].get("ref", {}).get("link", ""))
532                else:
533                    # No thumb for link
534                    pass
535            else:
536                unmapped_data.append({"loc": f"embed.{py_type}",
537                                      "obj": embed})
538
539        # Replies allowed
540        # https://docs.bsky.app/docs/tutorials/thread-gates
541        # threadgate object does not appear to differentiate between the types of replies allowed
542        replies_allowed = True if not item["threadgate"] else False
543
544        # Labels (content moderation)
545        labels = set() if not item["labels"] else set([label.get("val") for label in item["labels"]])
546        if item["record"].get("labels"):
547            labels = labels | set([label.get("val") for label in item["record"]["labels"].get("values",[])])
548
549        # Language
550        languages = "N/A" if not item["record"].get("langs") else ",".join(item["record"].get("langs"))
551
552        # Missing references
553        if any([ref for ref in image_references if ref not in "".join(embeded_images)]):
554            unmapped_data.append({"loc": "missing_image_refs", "obj": [ref for ref in image_references if ref not in "".join(embeded_images)]})
555        if quoted_ref:
556            if not quoted_link or (quoted_link not in ["DELETED", "REMOVED BY AUTHOR", "VIEWER BLOCKED BY AUTHOR"] and quoted_ref.split('/')[-1] not in quoted_link):
557                unmapped_data.append({"loc": "missing_quote_ref", "obj": quoted_ref})
558
559        # Reference Posts (expanded to include handles during collection)
560        # None: handles may change; true DID from original object stored item["record"]["facets"]["features"] w/ "did"
561        # Mentions
562        mentions = [(mention.get("handle") if (mention.get("handle") and mention["handle"].lower() not in SearchBluesky.handle_lookup_error_messages) else mention.get("did")) for mention in item.get("4CAT_metadata", {}).get("mentions", [])]
563        # Reply to
564        replied_to_post = None
565        replied_to_user = None
566        if item["record"].get("reply"):
567            if item["4CAT_metadata"]["reply_to"] and item["4CAT_metadata"]["reply_to"].lower() not in SearchBluesky.handle_lookup_error_messages:
568                replied_to_user = item["4CAT_metadata"]["reply_to"]
569            else:
570                # Use DID, though this will not create a working link
571                replied_to_user = item["record"]["reply"]["parent"]["uri"].split("/")[2]
572            replied_to_post = SearchBluesky.get_bsky_link(replied_to_user, item["record"]["reply"]["parent"]["uri"].split("/")[-1])
573
574        # These refer to slices of the text, but are also contained in the text or as an embed. If they are NOT also in the text and/or embed fields, then they are NOT displayed in bsky.app UI and thus only metadata
575        # if item["record"].get("entities"):
576        #     unmapped_data.append({"loc": "record.entities", "obj": item["record"]["entities"]})
577
578        # Author tags, not hashtags, not seen, very rarely used
579        # if item["record"].get("tags"):
580        #     unmapped_data.append({"loc": "record.tags", "obj": item["record"].get("tags")})
581
582        return MappedItem({
583            "collected_at": datetime.fromtimestamp(item["4CAT_metadata"]["collected_at"]).isoformat(),
584            "query": item["4CAT_metadata"]["query"],
585            "rank": item["4CAT_metadata"]["rank"],
586            "id": item["uri"],
587            "thread_id": item["record"]["reply"]["root"]["uri"] if item["record"].get("reply") else item["uri"],
588            "created_at": created_at.isoformat(),
589            "author": item["author"]["handle"],
590            "author_id": item["author"]["did"],
591            "body": item["record"]["text"],
592            "link": link,
593            "tags": ",".join(tags),
594            "like_count": item["like_count"],
595            "quote_count": item["quote_count"],
596            "reply_count": item["reply_count"],
597            "repost_count": item["repost_count"],
598            "quoted_post": quoted_link if quoted_link else "",
599            "quoted_user": quoted_user if quoted_user else "",
600            "replied_to_post": replied_to_post if replied_to_post else "",
601            "replied_to_user": replied_to_user if replied_to_user else "",
602            "replies_allowed": replies_allowed,
603            "mentions": ",".join(mentions),
604            "links": ",".join(embeded_links | links),
605            "images": ",".join(embeded_images),
606            "labels": ",".join(labels),
607            "has_poll": has_poll,
608            "languages": languages,
609
610            "author_display_name": item["author"]["display_name"],
611            "author_profile": author_profile,
612            "author_avatar": item["author"]["avatar"],
613            "author_created_at": SearchBluesky.bsky_convert_datetime_string(item["author"]["created_at"], mode="iso_string", raise_error=False),
614
615            "timestamp": int(created_at.timestamp()),
616        }, message=f"Bluesky new mappings: {unmapped_data}")
617
618    @staticmethod
619    def bsky_convert_datetime_string(datetime_string, mode="datetime", raise_error=True):
620        """
621        Bluesky datetime string to datetime object.
622
623        Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string.
624
625        :param str datetime_string:  The datetime string to convert
626        :param str mode:  The mode to return the datetime object in [datetime, iso_string]
627        :param bool raise_error:  Raise error if unable to parse else return datetime_string
628        :return datetime/str:  The converted datetime object
629        """
630        try:
631            datetime_object = parser.isoparse(datetime_string)
632        except ValueError as e:
633            if raise_error:
634                raise e
635            return datetime_string
636        
637        if mode == "datetime":
638            return datetime_object
639        elif mode == "iso_string":
640            return datetime_object.isoformat()
641
642
643    @staticmethod
644    def get_bsky_link(handle, post_id):
645        """
646        Get link to Bluesky post
647        """
648        return f"https://bsky.app/profile/{handle}/post/{post_id}"
649
650    @staticmethod
651    def bsky_get_handle_from_did(client, did):
652        """
653        Get handle from DID
654        """
655        tries = 0
656        while True:
657            try:
658                user_profile = client.app.bsky.actor.get_profile({"actor": did})
659                if user_profile:
660                    return user_profile.handle
661                else:
662                    return None
663            except (NetworkError, InvokeTimeoutError):
664                # Network error; try again
665                tries += 1
666                time.sleep(1)
667                if tries > 3:
668                    return None
669                continue
670            except BadRequestError as e:
671                if e.response.content.message:
672                    return e.response.content.message
673                return None
674
675    @staticmethod
676    def bsky_login(username, password, session_id, session_directory):
677        """
678        Login to Bluesky
679
680        :param str username:  Username for Bluesky
681        :param str password:  Password for Bluesky
682        :param str session_id:  Session ID to use for login
683        :param Path session_directory:  Directory to save the session file
684        :return Client:  Client object with login credentials
685        """
686        if not session_id:
687            session_id = SearchBluesky.create_session_id(username, password)
688        elif (not username or not password) and not session_id:
689            raise ValueError("Must provide both username and password or else session_id.")
690
691        session_path = session_directory.joinpath("bsky_" + session_id + ".session")
692
693        def on_session_change(event: SessionEvent, session: Session) -> None:
694            """
695            Save session to file; atproto session change event handler should keep the session up to date
696
697            https://atproto.blue/en/latest/atproto_client/auth.html
698            """
699            print('Session changed:', event, repr(session))
700            if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
701                print('Saving changed session')
702                with session_path.open("w") as session_file:
703                    session_file.write(session.export())
704
705        client = Client()
706        client.on_session_change(on_session_change)
707        if session_path.exists():
708            with session_path.open() as session_file:
709                session_string = session_file.read()
710                try:
711                    client.login(session_string=session_string)
712                except BadRequestError as e:
713                    if e.response.content.message == 'Token has expired':
714                        # Token has expired; try to refresh
715                        if username and password:
716                            client.login(login=username, password=password)
717                        else:
718                            raise ValueError("Session token has expired; please re-login with username and password.")
719        else:
720            # Were not able to log in via session string; login with username and password
721            client.login(login=username, password=password)
722        return client
723
724    @staticmethod
725    def create_session_id(username, password):
726        """
727        Generate a filename for the session file
728
729        This is a combination of username and password, but hashed
730        so that one cannot actually derive someone's information.
731
732        :param str username:  Username for Bluesky
733        :param str password:  Password for Bluesky
734        :return str: A hash value derived from the input
735        """
736        hash_base = username.strip() + str(password).strip()
737        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()

Search for posts in Bluesky

type = 'bsky-search'
category = 'Search'
title = 'Bluesky Search'
description = 'Collects Bluesky posts via its API.'
extension = 'ndjson'
is_local = False
is_static = False
config = {'bsky-search.max_results': {'type': 'string', 'help': 'Maximum results per query', 'coerce_type': <class 'int'>, 'min': 0, 'default': 50000, 'tooltip': "Amount of results (e.g., posts) per query. '0' will allow unlimited."}}
handle_lookup_error_messages = ['account is deactivated', 'profile not found', 'account has been suspended']
@classmethod
def get_options(cls, parent_dataset=None, config=None):
 47    @classmethod
 48    def get_options(cls, parent_dataset=None, config=None):
 49        """
 50        Get processor options
 51
 52        Just updates the description of the entities field based on the
 53        configured max entities.
 54
 55        :param DataSet parent_dataset:  An object representing the dataset that
 56          the processor would be run on
 57        :param User user:  Flask user the options will be displayed for, in
 58          case they are requested for display in the 4CAT web interface. This can
 59          be used to show some options only to privileges users.
 60        """
 61        options = {
 62            "intro": {
 63                "type": UserInput.OPTION_INFO,
 64                "help": "Collects Bluesky posts via its API.\n\nYour login credentials will be sent to the 4CAT server "
 65                        "and stored there while data is fetched. After the dataset has been created your credentials "
 66                        "will be deleted from the server. \n[See tips and tricks on how to query Bluesky](https://bsky.social/about/blog/05-31-2024-search)."
 67            },
 68            "username": {
 69                "type": UserInput.OPTION_TEXT,
 70                "help": "Bluesky Username",
 71                "cache": True,
 72                "sensitive": True,
 73                "tooltip": "If no server is specified, .bsky.social is used."
 74            },
 75            "password": {
 76                "type": UserInput.OPTION_TEXT,
 77                "help": "Bluesky Password",
 78                "cache": True, # tells the frontend to cache this value
 79                "sensitive": True, # tells the backend to delete this value after use
 80                "password": True, # tells the frontend this is a password type
 81            },
 82            "divider": {
 83                "type": UserInput.OPTION_DIVIDER
 84            },
 85            "query": {
 86                "type": UserInput.OPTION_TEXT_LARGE,
 87                "help": "Search Queries",
 88                "tooltip": "Separate with commas or line breaks."
 89            },
 90            "max_posts": {
 91                "type": UserInput.OPTION_TEXT,
 92                "help": "Max posts per query",
 93                "min": 1,
 94                "default": 100
 95            },
 96            "daterange": {
 97                "type": UserInput.OPTION_DATERANGE,
 98                "help": "Date range",
 99                "tooltip": "The date range for the search. No date range will search all posts."
100            },
101        }
102
103        # Update the max_posts setting from config
104        max_posts = int(config.get('bsky-search.max_results', default=100))
105        if max_posts == 0:
106            # This is potentially madness
107            options["max_posts"]["tooltip"] = "Set to 0 to collect all posts."
108            options['max_posts']['min'] = 0
109        else:
110            options["max_posts"]["max"] = max_posts
111            options['max_posts']['default'] = options['max_posts']['default'] if options['max_posts']['default'] <= max_posts else max_posts
112
113        return options

Get processor options

Just updates the description of the entities field based on the configured max entities.

Parameters
  • DataSet parent_dataset: An object representing the dataset that the processor would be run on
  • User user: Flask user the options will be displayed for, in case they are requested for display in the 4CAT web interface. This can be used to show some options only to privileges users.
@staticmethod
def validate_query(query, request, config):
115    @staticmethod
116    def validate_query(query, request, config):
117        """
118        Validate Bluesky query
119
120        :param dict query:  Query parameters, from client-side.
121        :param request:  Flask request
122        :param User user:  User object of user who has submitted the query
123        :return dict:  Safe query parameters
124        """
125        # no query 4 u
126        if not query.get("query", "").strip():
127            raise QueryParametersException("You must provide a search query.")
128
129        if not query.get("username", None) or not query.get("password", None) :
130            raise QueryParametersException("You need to provide valid Bluesky login credentials first.")
131
132        # If no server is specified, default to .bsky.social
133        if "." not in query.get("username"):
134            query["username"] += ".bsky.social"
135        # Remove @ at the start
136        if query.get("username").startswith("@"):
137            query["username"] = query["username"][1:]
138
139        # Test login credentials
140        session_id = SearchBluesky.create_session_id(query["username"], query["password"])
141        try:
142            SearchBluesky.bsky_login(username=query["username"], password=query["password"], session_id=session_id, session_directory=config.get("PATH_ROOT").joinpath(config.get("PATH_SESSIONS")))
143        except UnauthorizedError:
144            raise QueryParametersException("Invalid Bluesky login credentials.")
145        except RequestException as e:
146            if e.response.content.message == 'Rate Limit Exceeded':
147                lifted_at = datetime.fromtimestamp(int(e.response.headers["ratelimit-reset"]))
148                raise QueryParametersException(f"Bluesky rate limit exceeded. Try again after {lifted_at.strftime('%Y-%m-%d %H:%M:%S')}.")
149            else:
150                raise QueryParametersException(f"Bluesky login failed. {e.response.content.message}")
151
152        # sanitize query
153        sanitized_query = [q.strip() for q in query.get("query").replace("\n", ",").split(",") if q.strip()]
154
155        # the dates need to make sense as a range to search within
156        min_date, max_date = query.get("daterange")
157        if min_date and max_date and min_date > max_date:
158            raise QueryParametersException("The start date must be before the end date.")
159
160        # Only check this if not already confirmed by the frontend
161        posts_per_second = 55 # gathered from simply checking start/end times of logs
162        if not query.get("frontend-confirm"):
163            # Estimate is not returned; use max_posts as a rough estimate
164            max_posts = query.get("max_posts", 100)
165            expected_tweets = query.get("max_posts", 100) * len(sanitized_query)
166            # Warn if process may take more than ~1 hours
167            if expected_tweets > (posts_per_second * 3600):
168                expected_time = timify(expected_tweets / posts_per_second)
169                raise QueryNeedsExplicitConfirmationException(f"This query matches approximately {expected_tweets} tweets and may take {expected_time} to complete. Do you want to continue?")
170            elif max_posts == 0 and not min_date:
171                raise QueryNeedsExplicitConfirmationException("No maximum number of posts set! This query has no minimum date and thus may take a very, very long time to complete. Do you want to continue?")
172            elif max_posts == 0:
173                raise QueryNeedsExplicitConfirmationException("No maximum number of posts set! This query may take a long time to complete. Do you want to continue?")
174
175        return {
176            "max_posts": query.get("max_posts"),
177            "query": ",".join(sanitized_query),
178            "username": query.get("username"),
179            "password": query.get("password"),
180            "session_id": session_id,
181            "min_date": min_date,
182            "max_date": max_date,
183        }

Validate Bluesky query

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • User user: User object of user who has submitted the query
Returns

Safe query parameters

def get_items(self, query):
185    def get_items(self, query):
186        """
187        Execute a query; get messages for given parameters
188
189        Basically a wrapper around execute_queries() to call it with asyncio.
190
191        :param dict query:  Query parameters, as part of the DataSet object
192        :return list:  Posts, sorted by thread and post ID, in ascending order
193        """
194        if not query.get("session_id") and (not query.get("username") or not query.get("password")):
195            return self.dataset.finish_with_error("Your Bluesky login credentials are no longer available in 4CAT; please re-create this datasource.")
196
197        session_id = SearchBluesky.create_session_id(query.get("username"), query.get("password")) if not query.get("session_id") else query["session_id"]
198        try:
199            client = SearchBluesky.bsky_login(username=query.get("username"), password=query.get("password"), session_id=session_id, session_directory=self.config.get("PATH_ROOT").joinpath(self.config.get("PATH_SESSIONS")))
200        except (UnauthorizedError, RequestException, BadRequestError) as e:
201            self.dataset.log(f"Bluesky login failed: {e}")
202            return self.dataset.finish_with_error("Bluesky login failed; please re-create this datasource.")
203
204        self.dataset.update_status(f"Collecting posts from Bluesky as {client.me.handle}")
205
206        max_posts = query.get("max_posts", 100)
207        limit = 100 if (max_posts > 100 or max_posts == 0) else max_posts
208
209        # Handle reference mapping; user references use did instead of dynamic handle
210        did_to_handle = {}
211
212        query_parameters = {
213            "limit": limit,
214        }
215
216        # Add start and end dates if provided
217        if self.parameters.get("min_date"):
218            query_parameters['since'] = datetime.fromtimestamp(self.parameters.get("min_date")).strftime('%Y-%m-%dT%H:%M:%SZ')
219        if self.parameters.get("max_date"):
220            query_parameters['until'] = datetime.fromtimestamp(self.parameters.get("max_date")).strftime('%Y-%m-%dT%H:%M:%SZ')
221
222        queries = query.get("query").split(",")
223        num_queries = len(queries)
224        total_posts = 0
225        i = 0
226        last_query = None
227        last_date = None
228        while queries:
229            query = queries.pop(0)
230            if query == last_query:
231                # Check if there are continued posts from the last query
232                query_parameters['until'] = last_date.strftime('%Y-%m-%dT%H:%M:%SZ')
233                self.dataset.log(f"Continuing query ({i} of {num_queries}): {query} from {last_date.strftime('%Y-%m-%dT%H:%M:%SZ')}")
234            else:
235                # New query
236                query_post_ids = set()
237                i += 1
238                rank = 0
239                last_query = query
240                last_date = None
241                self.dataset.update_status(f"Collecting query ({i} of {num_queries}): {query}")
242                query_requests = 0
243
244            query_parameters["q"] = query
245            cursor = None  # Start with no cursor (first page)
246            search_for_invalid_post = False
247            invalid_post_counter = 0
248            while True:
249                if self.interrupted:
250                    raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API")
251                # Query posts, including pagination (cursor for next page)
252                tries = 0
253                response = None
254                while tries < 3:
255                    query_parameters["cursor"] = cursor
256                    try:
257                        response = client.app.bsky.feed.search_posts(params=query_parameters)
258                        break
259                    except ModelError as e:
260                        # Post validation error; one post is unable to be read
261                        # Pattern: some invalid post raises error, we switch from higher limit (usually 100) to 1 in
262                        # order to collect post by post, invalid post is identified again, we switch back to higher
263                        # limit and continue as normal, at the "end" of a cursor/query life (~10k posts) a NetworkError
264                        # is raised with detail refering to a server error 502 InternalServerError, we catch that and
265                        # add the query back to the queue with a new "until" date to continue the query
266                        # https://github.com/bluesky-social/atproto/issues/3446
267                        if not search_for_invalid_post:
268                            # New invalid post, search and skip
269                            self.dataset.log(f"Invalid post detected; searching post by post: {e}")
270                            search_for_invalid_post = True
271                            # Currently we must search post by post to find the invalid post
272                            query_parameters["limit"] = 1
273                        else:
274                            # Found invalid post, skip, reset counters
275                            self.dataset.log(
276                                f"Invalid post identified; skipping and continue with query as normal: {e}")
277                            search_for_invalid_post = False
278                            # Reset limit to normal
279                            query_parameters["limit"] = limit
280                            invalid_post_counter = 0
281                            cursor = str(int(cursor) + 1) if cursor else None
282                        # Re-query with new cursor & limit
283                        continue
284
285                    except InvokeTimeoutError as e:
286                        # Timeout error, but can occur for odd queries with no results
287                        self.dataset.log(f"Bluesky request error for query {query}: {e}")
288                        time.sleep(1)
289                        tries += 2
290                        continue
291                    except NetworkError as e:
292                        # 502 InternalServerError: occurs if switch limits in a "set" (i.e. the vague 10k posts cursor limit), I seem to get this error around the 10k mark instead of just a missing cursor as normal
293                        self.dataset.log(f"Bluesky network error for query {query}; retrying: {e}")
294                        time.sleep(1 + (tries * 10))
295                        queries.insert(0, query)
296                        break
297
298                if not response:
299                    # Expected from NetworkError, but the query will have been added back to the queue
300                    # If not, then there was a problem with the query
301                    if len(queries) == 0:
302                        self.dataset.update_status("Error collecting posts from Bluesky; see log for details", is_final=True)
303                    if query not in queries:
304                        # Query was not added back; there was an unexpected issue with the query itself
305                        self.dataset.update_status(f"Error continuing {query} from Bluesky (see log for details); continuing to next query")
306                    break
307
308                query_requests += 1
309                items = response['posts'] if hasattr(response, 'posts') else []
310
311                if search_for_invalid_post:
312                    invalid_post_counter += 1
313                    if invalid_post_counter >= 100:
314                        #  Max limit is 100; this should not occur, but we do not want to continue searching post by post indefinitely
315                        self.dataset.log("Unable to identify invalid post; discontinuing search")
316                        query_parameters["limit"] = limit
317                        search_for_invalid_post = False
318                        invalid_post_counter = 0
319
320                    if not items:
321                        # Sometimes no post is returned, but there still may be posts following
322                        self.dataset.log(f"Query {query} w/ params {query_parameters} returned no posts: {response}")
323                        # TODO: this is odd; no information is returned as to why that one item is not returned and no error is raised
324                        cursor = str(int(cursor) + 1) if cursor else None
325                        continue
326
327                new_posts = 0
328                # Handle the posts
329                for item in items:
330                    if 0 < max_posts <= rank:
331                        break
332
333                    if self.interrupted:
334                        raise ProcessorInterruptedException("Interrupted while getting posts from the Bluesky API")
335
336                    post = item.model_dump()
337                    post_id = post["uri"]
338                    # Queries use the indexed_at date for time-based pagination (as opposed to created_at); used to continue query if needed
339                    last_date = SearchBluesky.bsky_convert_datetime_string(post.get("indexed_at"))
340                    if post_id in query_post_ids:
341                        # Skip duplicate posts
342                        continue
343
344                    new_posts += 1
345                    query_post_ids.add(post_id)
346
347                    # Add user handles from references
348                    did_to_handle[post["author"]["did"]] = post["author"]["handle"]
349                    # Mentions
350                    mentions = []
351                    if post["record"].get("facets"):
352                        for facet in post["record"]["facets"]:
353                            for feature in facet.get("features", {}):
354                                if feature.get("did"):
355                                    if feature["did"] in did_to_handle:
356                                        mentions.append({"did": feature["did"], "handle": did_to_handle[feature["did"]]})
357                                    else:
358                                        handle = SearchBluesky.bsky_get_handle_from_did(client, feature["did"])
359                                        if handle:
360                                            if handle.lower() in self.handle_lookup_error_messages:
361                                                self.dataset.log(f"Bluesky: user ({feature['did']}) {handle}")
362                                            mentions.append({"did": feature["did"], "handle": handle})
363                                            did_to_handle[feature["did"]] = handle
364                                        else:
365                                            mentions.append({"did": feature["did"], "handle": None})
366                                            self.dataset.log(f"Bluesky: could not lookup the handle for {feature['did']}")
367                    # Reply to
368                    reply_to_handle = None
369                    if post["record"].get("reply"):
370                        reply_to_did = post["record"]["reply"]["parent"]["uri"].split("/")[2]
371                        if reply_to_did in did_to_handle:
372                            reply_to_handle = did_to_handle[reply_to_did]
373                        else:
374                            handle = SearchBluesky.bsky_get_handle_from_did(client, reply_to_did)
375                            if handle:
376                                if handle.lower() in self.handle_lookup_error_messages:
377                                    self.dataset.log(f"Bluesky: user ({reply_to_did}) {handle}")
378                                reply_to_handle = handle
379                                did_to_handle[reply_to_did] = handle
380                            else:
381                                self.dataset.log(f"Bluesky: could not find handle for {reply_to_did}")
382
383
384                    post.update({"4CAT_metadata": {
385                        "collected_at": datetime.now().timestamp(),
386                        "query": query,
387                        "rank": rank,
388                        "mentions": mentions,
389                        "reply_to": reply_to_handle if reply_to_handle else None,
390                    }})
391                    rank += 1
392                    yield post
393                    total_posts += 1
394
395                # Check if there is a cursor for the next page
396                cursor = response['cursor']                
397                if max_posts != 0 and rank % (max_posts // 10) == 0:
398                    self.dataset.update_status(f"Progress query {query}: {rank} posts collected out of {max_posts}")
399                    self.dataset.update_progress(total_posts / (max_posts * num_queries))
400                elif max_posts == 0 and rank % 1000 == 0:
401                    self.dataset.update_status(f"Progress query {query}: {rank} posts collected")
402
403                if 0 < max_posts <= rank:
404                    self.dataset.update_status(
405                        f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}")
406                    break
407
408                if not cursor:
409                    if new_posts:
410                        # Bluesky API seems to stop around 10000 posts and not return a cursor
411                        # Re-query with the same query to get the next set of posts using last_date (set above)
412                        self.dataset.log(f"Query {query}: {query_requests} requests")
413                        queries.insert(0, query)
414                    else:
415                        # No new posts; if we have not hit the max_posts, but no new posts are being returned, then we are done
416                        self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned")
417
418                    if rank:
419                        self.dataset.update_status(f"Collected {rank} posts {'of ' + str(max_posts) if max_posts != 0 else ''} for query {query}")
420                    break  # No more pages, stop the loop
421                elif not items:
422                    self.dataset.log(f"Query {query}: {query_requests} requests; no additional posts returned")
423                    break

Execute a query; get messages for given parameters

Basically a wrapper around execute_queries() to call it with asyncio.

Parameters
  • dict query: Query parameters, as part of the DataSet object
Returns

Posts, sorted by thread and post ID, in ascending order

@staticmethod
def map_item(item):
425    @staticmethod
426    def map_item(item):
427        """
428        Convert item object to 4CAT-ready data object
429
430        :param dict item:  item to parse
431        :return dict:  4CAT-compatible item object
432        """
433        unmapped_data = []
434
435        # Add link to post; this is a Bluesky-specific URL and may not always be accurate
436        link = SearchBluesky.get_bsky_link(item['author']['handle'], item['uri'].split('/')[-1])
437        author_profile = f"https://bsky.app/profile/{item['author']['handle']}"
438
439        created_at = SearchBluesky.bsky_convert_datetime_string(item["record"].get("created_at",item["record"].get("createdAt")))
440
441        # Tags
442        tags = set()
443        links = set()
444        mentions_did = set()
445        has_poll = False
446        if item["record"].get("facets"):
447            for facet in item["record"].get("facets"):
448                for feature in facet.get("features"):
449                    if feature.get("tag"):
450                        tags.add(feature.get("tag"))
451                    elif feature.get("uri"):
452                        links.add(feature.get("uri"))
453                    elif feature.get("did"):
454                        mentions_did.add(feature.get("did"))
455                    elif feature.get("number"):
456                        has_poll = True
457                    else:
458                        unmapped_data.append({"loc": "record.facets.features", "obj": feature})
459                if "features" not in facet:
460                    unmapped_data.append({"loc": "record.facets", "obj": facet})
461
462        # Embeds are in both the item and the record; so far these always contain same item
463        embeded_links = set()
464        embeded_images = set()
465        image_references = set()
466        quoted_link = None
467        quoted_user = None
468        quoted_ref = None
469        possible_embeds = [item.get("embed", {}), item["record"].get("embed", {})]
470        while possible_embeds:
471            embed = possible_embeds.pop(0)
472            if not embed:
473                continue
474
475            py_type = embed.pop("py_type") if "py_type" in embed else (embed.pop("$type") if "$type" in embed else None)
476            if py_type in ["app.bsky.embed.recordWithMedia#view", "app.bsky.embed.recordWithMedia"]:
477                # contains post plus additional media
478                for key, media_ob in embed.items():
479                    possible_embeds.append(media_ob)
480
481            elif "images" in embed: # py_type in ["app.bsky.embed.images#view", "app.bsky.embed.images", "app.bsky.embed.images#main"]
482                for img_ob in embed["images"]:
483                    img_link = img_ob.get("fullsize", img_ob.get("thumb"))
484                    if img_link:
485                        embeded_images.add(img_link)
486                    elif img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")):
487                        # ob.get("image").get("ref").get("link") will have a reference that could be linked via API
488                        # BUT ref has already been obtained in other embeds...
489                        image_references.add(img_ob.get("image", {}).get("ref", {}).get("link", img_ob.get("image", {}).get("ref", {}).get("$link")))
490                    else:
491                        unmapped_data.append({"loc": "embed.images", "obj": img_ob})
492            elif py_type in ["app.bsky.embed.video#view", "app.bsky.embed.video"]:
493                # Does not appear to be direct video links, just thumbnail (Bluesky API may be able to get more)
494                if embed.get("thumbnail"):
495                    embeded_images.add(embed.get("thumbnail"))
496                elif embed.get("video", {}).get("ref", {}).get("link"):
497                    image_references.add(embed.get("video", {}).get("ref", {}).get("link"))
498                else:
499                    # No thumb for video
500                    pass
501            elif "record" in embed: # py_type in ["app.bsky.embed.record#view", "app.bsky.embed.record"]
502                # Quoted post
503                # Note: these may also contain images that would be seen, but are not part of the original post
504                if embed["record"].get("author", embed["record"].get("creator")):
505                    if "handle" not in embed["record"].get("author", embed["record"].get("creator")):
506                        # User may not be able to see original post
507                        if "app.bsky.feed.defs#blockedAuthor" == embed["record"].get("author", embed["record"].get("creator"))["py_type"]:
508                            quoted_link = "VIEWER BLOCKED BY AUTHOR"
509                        else:
510                            # New unknown
511                            unmapped_data.append({"loc": "embed.record.author", "obj": embed["record"].get("author", embed["record"].get("creator"))})
512                    else:
513                        # author seems to be a quoted post while creator a quoted "list"
514                        quoted_user = embed["record"].get("author", embed["record"].get("creator"))["handle"]
515                        quoted_link = SearchBluesky.get_bsky_link(quoted_user, embed['record']['uri'].split('/')[-1])
516                elif embed["record"].get("not_found"):
517                    quoted_link = "DELETED"
518                    # We do have the DID, but this information is not normally displayed
519                    # quoted_user = embed["record"]['uri'].split("/")[2]
520                elif embed["record"].get("detached"):
521                    quoted_link = "REMOVED BY AUTHOR"
522                else:
523                    quoted_ref = embed["record"]['uri']
524            elif "external" in embed: # py_type in ["app.bsky.embed.external#view", "app.bsky.embed.external"]
525                if embed["external"].get("uri"):
526                    embeded_links.add(embed["external"].get("uri"))
527                if embed["external"].get("thumb"):
528                    if isinstance(embed["external"]["thumb"], str):
529                        embeded_images.add(embed["external"]["thumb"])
530                    else:
531                        image_references.add(embed["external"]["thumb"].get("ref", {}).get("link", ""))
532                else:
533                    # No thumb for link
534                    pass
535            else:
536                unmapped_data.append({"loc": f"embed.{py_type}",
537                                      "obj": embed})
538
539        # Replies allowed
540        # https://docs.bsky.app/docs/tutorials/thread-gates
541        # threadgate object does not appear to differentiate between the types of replies allowed
542        replies_allowed = True if not item["threadgate"] else False
543
544        # Labels (content moderation)
545        labels = set() if not item["labels"] else set([label.get("val") for label in item["labels"]])
546        if item["record"].get("labels"):
547            labels = labels | set([label.get("val") for label in item["record"]["labels"].get("values",[])])
548
549        # Language
550        languages = "N/A" if not item["record"].get("langs") else ",".join(item["record"].get("langs"))
551
552        # Missing references
553        if any([ref for ref in image_references if ref not in "".join(embeded_images)]):
554            unmapped_data.append({"loc": "missing_image_refs", "obj": [ref for ref in image_references if ref not in "".join(embeded_images)]})
555        if quoted_ref:
556            if not quoted_link or (quoted_link not in ["DELETED", "REMOVED BY AUTHOR", "VIEWER BLOCKED BY AUTHOR"] and quoted_ref.split('/')[-1] not in quoted_link):
557                unmapped_data.append({"loc": "missing_quote_ref", "obj": quoted_ref})
558
559        # Reference Posts (expanded to include handles during collection)
560        # None: handles may change; true DID from original object stored item["record"]["facets"]["features"] w/ "did"
561        # Mentions
562        mentions = [(mention.get("handle") if (mention.get("handle") and mention["handle"].lower() not in SearchBluesky.handle_lookup_error_messages) else mention.get("did")) for mention in item.get("4CAT_metadata", {}).get("mentions", [])]
563        # Reply to
564        replied_to_post = None
565        replied_to_user = None
566        if item["record"].get("reply"):
567            if item["4CAT_metadata"]["reply_to"] and item["4CAT_metadata"]["reply_to"].lower() not in SearchBluesky.handle_lookup_error_messages:
568                replied_to_user = item["4CAT_metadata"]["reply_to"]
569            else:
570                # Use DID, though this will not create a working link
571                replied_to_user = item["record"]["reply"]["parent"]["uri"].split("/")[2]
572            replied_to_post = SearchBluesky.get_bsky_link(replied_to_user, item["record"]["reply"]["parent"]["uri"].split("/")[-1])
573
574        # These refer to slices of the text, but are also contained in the text or as an embed. If they are NOT also in the text and/or embed fields, then they are NOT displayed in bsky.app UI and thus only metadata
575        # if item["record"].get("entities"):
576        #     unmapped_data.append({"loc": "record.entities", "obj": item["record"]["entities"]})
577
578        # Author tags, not hashtags, not seen, very rarely used
579        # if item["record"].get("tags"):
580        #     unmapped_data.append({"loc": "record.tags", "obj": item["record"].get("tags")})
581
582        return MappedItem({
583            "collected_at": datetime.fromtimestamp(item["4CAT_metadata"]["collected_at"]).isoformat(),
584            "query": item["4CAT_metadata"]["query"],
585            "rank": item["4CAT_metadata"]["rank"],
586            "id": item["uri"],
587            "thread_id": item["record"]["reply"]["root"]["uri"] if item["record"].get("reply") else item["uri"],
588            "created_at": created_at.isoformat(),
589            "author": item["author"]["handle"],
590            "author_id": item["author"]["did"],
591            "body": item["record"]["text"],
592            "link": link,
593            "tags": ",".join(tags),
594            "like_count": item["like_count"],
595            "quote_count": item["quote_count"],
596            "reply_count": item["reply_count"],
597            "repost_count": item["repost_count"],
598            "quoted_post": quoted_link if quoted_link else "",
599            "quoted_user": quoted_user if quoted_user else "",
600            "replied_to_post": replied_to_post if replied_to_post else "",
601            "replied_to_user": replied_to_user if replied_to_user else "",
602            "replies_allowed": replies_allowed,
603            "mentions": ",".join(mentions),
604            "links": ",".join(embeded_links | links),
605            "images": ",".join(embeded_images),
606            "labels": ",".join(labels),
607            "has_poll": has_poll,
608            "languages": languages,
609
610            "author_display_name": item["author"]["display_name"],
611            "author_profile": author_profile,
612            "author_avatar": item["author"]["avatar"],
613            "author_created_at": SearchBluesky.bsky_convert_datetime_string(item["author"]["created_at"], mode="iso_string", raise_error=False),
614
615            "timestamp": int(created_at.timestamp()),
616        }, message=f"Bluesky new mappings: {unmapped_data}")

Convert item object to 4CAT-ready data object

Parameters
  • dict item: item to parse
Returns

4CAT-compatible item object

@staticmethod
def bsky_convert_datetime_string(datetime_string, mode='datetime', raise_error=True):
618    @staticmethod
619    def bsky_convert_datetime_string(datetime_string, mode="datetime", raise_error=True):
620        """
621        Bluesky datetime string to datetime object.
622
623        Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string.
624
625        :param str datetime_string:  The datetime string to convert
626        :param str mode:  The mode to return the datetime object in [datetime, iso_string]
627        :param bool raise_error:  Raise error if unable to parse else return datetime_string
628        :return datetime/str:  The converted datetime object
629        """
630        try:
631            datetime_object = parser.isoparse(datetime_string)
632        except ValueError as e:
633            if raise_error:
634                raise e
635            return datetime_string
636        
637        if mode == "datetime":
638            return datetime_object
639        elif mode == "iso_string":
640            return datetime_object.isoformat()

Bluesky datetime string to datetime object.

Mode "datetime" returns a datetime object, while "iso_string" returns an ISO formatted string.

Parameters
  • str datetime_string: The datetime string to convert
  • str mode: The mode to return the datetime object in [datetime, iso_string]
  • bool raise_error: Raise error if unable to parse else return datetime_string
Returns

The converted datetime object

@staticmethod
def bsky_get_handle_from_did(client, did):
650    @staticmethod
651    def bsky_get_handle_from_did(client, did):
652        """
653        Get handle from DID
654        """
655        tries = 0
656        while True:
657            try:
658                user_profile = client.app.bsky.actor.get_profile({"actor": did})
659                if user_profile:
660                    return user_profile.handle
661                else:
662                    return None
663            except (NetworkError, InvokeTimeoutError):
664                # Network error; try again
665                tries += 1
666                time.sleep(1)
667                if tries > 3:
668                    return None
669                continue
670            except BadRequestError as e:
671                if e.response.content.message:
672                    return e.response.content.message
673                return None

Get handle from DID

@staticmethod
def bsky_login(username, password, session_id, session_directory):
675    @staticmethod
676    def bsky_login(username, password, session_id, session_directory):
677        """
678        Login to Bluesky
679
680        :param str username:  Username for Bluesky
681        :param str password:  Password for Bluesky
682        :param str session_id:  Session ID to use for login
683        :param Path session_directory:  Directory to save the session file
684        :return Client:  Client object with login credentials
685        """
686        if not session_id:
687            session_id = SearchBluesky.create_session_id(username, password)
688        elif (not username or not password) and not session_id:
689            raise ValueError("Must provide both username and password or else session_id.")
690
691        session_path = session_directory.joinpath("bsky_" + session_id + ".session")
692
693        def on_session_change(event: SessionEvent, session: Session) -> None:
694            """
695            Save session to file; atproto session change event handler should keep the session up to date
696
697            https://atproto.blue/en/latest/atproto_client/auth.html
698            """
699            print('Session changed:', event, repr(session))
700            if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
701                print('Saving changed session')
702                with session_path.open("w") as session_file:
703                    session_file.write(session.export())
704
705        client = Client()
706        client.on_session_change(on_session_change)
707        if session_path.exists():
708            with session_path.open() as session_file:
709                session_string = session_file.read()
710                try:
711                    client.login(session_string=session_string)
712                except BadRequestError as e:
713                    if e.response.content.message == 'Token has expired':
714                        # Token has expired; try to refresh
715                        if username and password:
716                            client.login(login=username, password=password)
717                        else:
718                            raise ValueError("Session token has expired; please re-login with username and password.")
719        else:
720            # Were not able to log in via session string; login with username and password
721            client.login(login=username, password=password)
722        return client

Login to Bluesky

Parameters
  • str username: Username for Bluesky
  • str password: Password for Bluesky
  • str session_id: Session ID to use for login
  • Path session_directory: Directory to save the session file
Returns

Client object with login credentials

@staticmethod
def create_session_id(username, password):
724    @staticmethod
725    def create_session_id(username, password):
726        """
727        Generate a filename for the session file
728
729        This is a combination of username and password, but hashed
730        so that one cannot actually derive someone's information.
731
732        :param str username:  Username for Bluesky
733        :param str password:  Password for Bluesky
734        :return str: A hash value derived from the input
735        """
736        hash_base = username.strip() + str(password).strip()
737        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()

Generate a filename for the session file

This is a combination of username and password, but hashed so that one cannot actually derive someone's information.

Parameters
  • str username: Username for Bluesky
  • str password: Password for Bluesky
Returns

A hash value derived from the input