Keep the Firehose Flowing: Adding Resilience to a Bluesky Feed Generators

January 12, 2025 (2mo ago)

After I released the NBA Now feed on Bluesky I noticed that a couple times a day the feed would stop reading the firehose stream. There were no errors in the logs. The Flask app was running since you could still load the feed, but new skeets were not being processed from the firehose.

There was a similar issue reported in the GitHub repo for the feed generator template. The user reported that it ultimately ended up being a client side issue. On the thread another user reported having a similar issue that they resolved by adjusting the timeout value on the firehose client itself inside the Python atproto SDK.

Based on what I was observing the likely culprit seemed like a connection issue with the socket timing out and not reconnecting. My preference was to fix the issue without modifying the atproto Python SDK like the other user did. Instead I wanted to implement the solution inside the feed generator to avoid having to maintain a fork of the atproto Python SDK.

The result is I made this fork of the feed generator template that includes a heartbeat monitor to detect and recover from socket connection issues.

Adding the Heartbeat Monitor

To detect and recover from socket connection issues a heartbeat monitor was implemented as a part of the data ingestion from the firehose. The heartbeat monitor detects if there is a delay from the last message received from the firehose. Each message received from the firehose triggers a heartbeat with the beat() method, which updates the time of the last heartbeat.

def beat(self):
    previous_beat = self.last_heartbeat
    self.last_heartbeat = time.time()
    time_delta = self.last_heartbeat - previous_beat
    
    if time_delta > self.timeout:
        logger.warning(
            f"Late heartbeat detected! Time since last beat: {time_delta:.1f}s"
        )
        
    # Reset consecutive timeouts on successful beat
    self.consecutive_timeouts = 0
    
    # Ensure timer is always running
    if not self.timer or not self.timer.is_alive():
        self.start_timer()

A timer runs periodically on the HeartbeatMonitor class to check if the last heartbeat is within a threshold (20 seconds in this case). Heroku automatically closes socket connections after 55 seconds of inactivity. Setting the threshold to 20 seconds allows a buffer for the reconnection to happen before the socket is closed. Given the volumn of posts coming from the firehose it's rare for a 20 second delay between messages to occur organically.

def start_timer(self):
    if self.timer:
        self.timer.cancel()
    self.timer = Timer(HEARTBEAT_CHECK_INTERVAL, self.check_heartbeat)
    self.timer.daemon = True
    self.timer.start()

If the last heartbeat is within the threshold then the firehose connection is considered healthy and the system continue to process messages from the firehose. If the last heartbeat exceeds the threshold then the firehose connection is considered unhealthy and the system will attempt to reconnect.

if time_since_last > self.timeout:
    self.consecutive_timeouts += 1
    logger.warning(f"Heartbeat timeout detected: {time_since_last:.1f}s since last beat (timeout #{self.consecutive_timeouts})")
    
    if time_since_last > (self.timeout * 1.5) or self.consecutive_timeouts >= self.max_consecutive_timeouts:
        error_msg = "Connection severely delayed" if time_since_last > (self.timeout * 1.5) else "Maximum consecutive timeouts reached"
        logger.error(f"{error_msg} ({time_since_last:.1f}s). Forcing immediate reconnection.")
        self.should_restart = True
        self.stop_event.set()
        logger.info("HeartbeatMonitor: Stop event set and should_restart flagged")
        
        if self.timer:
            self.timer.cancel()
            logger.info("HeartbeatMonitor: Timer cancelled")
        
        if self.client_ref:
            logger.info("HeartbeatMonitor: Actively triggering client reconnection")
            threading.Thread(target=self._trigger_reconnect, daemon=True).start()
        return

When a reconnection occurs the HeartbeatMonitor will close the existing firehose connection and create a new one with an exponential backoff. The backoff starts at 1 second and doubles each time a reconnection attempt fails up until a maximum of 30 seconds.

Next Steps

After implementing the heartbeat monitor, the NBA Now feed has been running reliably without any disconnection issues. The solution provides robust error detection and recovery while maintaining a clean separation from the underlying atproto SDK. The exponential backoff strategy has proven effective at handling temporary network issues without overwhelming the system during outages.

This approach can be easily adapted for other Bluesky feed generators experiencing similar connection stability issues. The complete implementation is available in my fork of the feed generator template, where you can also find additional documentation and usage examples.

If you're building your own Bluesky feed generator, consider implementing a similar heartbeat mechanism to ensure reliable firehose connectivity, especially if you're hosting on platforms with automatic connection timeouts like Heroku.