dream/services/broadcaster
Types
Publish/subscribe broadcaster for concurrent consumers
This module implements a small publish/subscribe (pub/sub) service using Gleam/OTP actors. It is a good fit for broadcasting messages to many WebSocket connections, but it can also be used for any fan-out messaging inside your application.
The flow is:
- Start a
Broadcaster(message)at application startup and store it in yourServicestype. - Each consumer calls
subscribe/1to obtain aChannel(message). - Convert the channel to a
Selector(message)withchannel_to_selector/1when you want to receive messages in a select loop (for example inside a WebSocket handler). - Call
publish/2from anywhere in your app to broadcast a message to all current subscribers.
For more complex routing, per-room partitioning, or persistence, treat this as a building block and layer additional actor logic on top.
Example
// In services.gleam
pub type Services {
Services(chat_bus: broadcaster.Broadcaster(ChatMessage))
}
pub fn initialize() -> Services {
let assert Ok(chat_bus) = broadcaster.start_broadcaster()
Services(chat_bus: chat_bus)
}
// In WebSocket init handler
fn handle_init(conn, services: Services) {
let channel = broadcaster.subscribe(services.chat_bus)
let selector = broadcaster.channel_to_selector(channel)
#(initial_state, Some(selector))
}
// Anywhere in your app
broadcaster.publish(services.chat_bus, ChatMessage("hello"))
pub opaque type Broadcaster(message)
Values
pub fn channel_to_selector(
channel: Channel(message),
) -> process.Selector(message)
Convert a channel to a selector for use in WebSocket message loops.
This allows the channel to be used with process.Selector to receive
messages in the WebSocket handler’s event loop.
Example
let channel = broadcaster.subscribe(my_broadcaster)
let selector = broadcaster.channel_to_selector(channel)
#(initial_state, Some(selector))
pub fn publish(
broadcaster: Broadcaster(message),
message: message,
) -> Nil
Publish a message to all subscribers.
The message will be sent to all channels that have subscribed to this broadcaster.
Example
broadcaster.publish(my_broadcaster, UserJoined("Alice"))
pub fn start_broadcaster() -> Result(
Broadcaster(message),
actor.StartError,
)
Start a new broadcaster service.
Returns an error if the broadcaster fails to start.
Example
let assert Ok(broadcaster) = broadcaster.start_broadcaster()
pub fn subscribe(
broadcaster: Broadcaster(message),
) -> Channel(message)
Subscribe to receive messages from the broadcaster.
Returns a channel that will receive all messages published to the broadcaster.
Example
let channel = broadcaster.subscribe(my_broadcaster)
let selector = broadcaster.channel_to_selector(channel)
pub fn unsubscribe(
broadcaster: Broadcaster(message),
channel: Channel(message),
) -> Nil
Unsubscribe a channel from the broadcaster.
The channel will no longer receive published messages. Note: This is typically not needed as channels are automatically cleaned up when processes terminate.
Example
broadcaster.unsubscribe(my_broadcaster, channel)