goose

Package Version Hex Docs

A Gleam WebSocket consumer for AT Protocol Jetstream events.

gleam add goose@1

Quick Start

import goose
import gleam/io

pub fn main() {
  // Use default config with automatic retry logic
  let config = goose.default_config()

  goose.start_consumer(config, fn(json_event) {
    let event = goose.parse_event(json_event)

    case event {
      goose.CommitEvent(did, time_us, commit) -> {
        io.println("Commit: " <> commit.operation <> " in " <> commit.collection)
      }
      goose.IdentityEvent(did, time_us, identity) -> {
        io.println("Identity: " <> identity.handle)
      }
      goose.AccountEvent(did, time_us, account) -> {
        io.println("Account updated: " <> did)
      }
      goose.UnknownEvent(_) -> {
        io.println("Unknown event type")
      }
    }
  })
}

Custom Configuration

import goose
import gleam/option

pub fn main() {
  let config = goose.JetstreamConfig(
    endpoint: "wss://jetstream2.us-east.bsky.network/subscribe",
    wanted_collections: ["app.bsky.feed.post", "app.bsky.feed.like"],
    wanted_dids: [],
    cursor: option.None,
    max_message_size_bytes: option.None,
    compress: False,
    require_hello: False,
    // Retry configuration
    max_backoff_seconds: 60,      // Max wait between retries
    log_connection_events: True,  // Log connects/disconnects
    log_retry_attempts: False,    // Skip verbose retry logs
  )

  goose.start_consumer(config, handle_event)
}

Configuration Options

Note: Goose automatically handles connection failures with exponential backoff retry logic (1s, 2s, 4s, 8s, 16s, 32s, up to max). All connections automatically retry on failure, reconnect on disconnection, and distinguish between harmless timeouts and real errors.

wanted_collections

An array of Collection NSIDs to filter which records you receive (default: empty = all collections)

Example:

wanted_collections: ["app.bsky.feed.post", "app.bsky.graph.*"]

wanted_dids

An array of Repo DIDs to filter which records you receive (default: empty = all repos)

Example:

wanted_dids: ["did:plc:example123", "did:plc:example456"]

cursor

A unix microseconds timestamp to begin playback from

Example:

cursor: option.Some(1234567890123456)

max_message_size_bytes

The maximum size of a payload that this client would like to receive

Example:

max_message_size_bytes: option.Some(1048576)  // 1MB limit

compress

Enable zstd compression for WebSocket frames

Example:

compress: True

Note: Compression is transparent to your application - compressed messages are automatically decompressed before being passed to your event handler. The bandwidth savings occur on the wire between the server and your client.

require_hello

Pause replay/live-tail until server receives a SubscriberOptionsUpdatePayload

Example:

require_hello: True

max_backoff_seconds

Maximum wait time in seconds between retry attempts

Example:

max_backoff_seconds: 120  // Allow up to 2 minute waits between retries

log_connection_events

Whether to log connection state changes (connected, disconnected)

Example:

log_connection_events: True

log_retry_attempts

Whether to log detailed retry attempt information

Example:

log_retry_attempts: False  // Production: skip verbose retry logs

Full Configuration Example

import goose
import gleam/option

let config = goose.JetstreamConfig(
  endpoint: "wss://jetstream2.us-east.bsky.network/subscribe",
  wanted_collections: ["app.bsky.feed.post", "app.bsky.graph.*"],
  wanted_dids: ["did:plc:example123"],
  cursor: option.Some(1234567890123456),
  max_message_size_bytes: option.Some(2097152),  // 2MB
  compress: True,
  require_hello: False,
  max_backoff_seconds: 60,
  log_connection_events: True,
  log_retry_attempts: False,
)

goose.start_consumer(config, handle_event)

Further documentation can be found at https://hexdocs.pm/goose.

Development

gleam build # Build the project
gleam test  # Run the tests
Search Document