dream/services/broadcaster

Types

Publish/subscribe broadcaster for concurrent consumers

This module implements a small publish/subscribe (pub/sub) service using Gleam/OTP actors. It is a good fit for broadcasting messages to many WebSocket connections, but it can also be used for any fan-out messaging inside your application.

The flow is:

  1. Start a Broadcaster(message) at application startup and store it in your Services type.
  2. Each consumer calls subscribe/1 to obtain a Channel(message).
  3. Convert the channel to a Selector(message) with channel_to_selector/1 when you want to receive messages in a select loop (for example inside a WebSocket handler).
  4. Call publish/2 from anywhere in your app to broadcast a message to all current subscribers.

For more complex routing, per-room partitioning, or persistence, treat this as a building block and layer additional actor logic on top.

Example

// In services.gleam
pub type Services {
  Services(chat_bus: broadcaster.Broadcaster(ChatMessage))
}

pub fn initialize() -> Services {
  let assert Ok(chat_bus) = broadcaster.start_broadcaster()
  Services(chat_bus: chat_bus)
}

// In WebSocket init handler
fn handle_init(conn, services: Services) {
  let channel = broadcaster.subscribe(services.chat_bus)
  let selector = broadcaster.channel_to_selector(channel)
  #(initial_state, Some(selector))
}

// Anywhere in your app
broadcaster.publish(services.chat_bus, ChatMessage("hello"))
pub opaque type Broadcaster(message)

A channel receives published messages from a broadcaster.

pub opaque type Channel(message)

Values

pub fn channel_to_selector(
  channel: Channel(message),
) -> process.Selector(message)

Convert a channel to a selector for use in WebSocket message loops.

This allows the channel to be used with process.Selector to receive messages in the WebSocket handler’s event loop.

Example

let channel = broadcaster.subscribe(my_broadcaster)
let selector = broadcaster.channel_to_selector(channel)
#(initial_state, Some(selector))
pub fn publish(
  broadcaster: Broadcaster(message),
  message: message,
) -> Nil

Publish a message to all subscribers.

The message will be sent to all channels that have subscribed to this broadcaster.

Example

broadcaster.publish(my_broadcaster, UserJoined("Alice"))
pub fn start_broadcaster() -> Result(
  Broadcaster(message),
  actor.StartError,
)

Start a new broadcaster service.

Returns an error if the broadcaster fails to start.

Example

let assert Ok(broadcaster) = broadcaster.start_broadcaster()
pub fn subscribe(
  broadcaster: Broadcaster(message),
) -> Channel(message)

Subscribe to receive messages from the broadcaster.

Returns a channel that will receive all messages published to the broadcaster.

Example

let channel = broadcaster.subscribe(my_broadcaster)
let selector = broadcaster.channel_to_selector(channel)
pub fn unsubscribe(
  broadcaster: Broadcaster(message),
  channel: Channel(message),
) -> Nil

Unsubscribe a channel from the broadcaster.

The channel will no longer receive published messages. Note: This is typically not needed as channels are automatically cleaned up when processes terminate.

Example

broadcaster.unsubscribe(my_broadcaster, channel)
Search Document