topics
Topic-based publish/subscribe for Gleam with automatic dead subscriber cleanup.
Messages can be of any type — each PubSub(msg) instance is isolated by the message type, enforced at compile time.
Installation
gleam add topics
[dependencies]
topics = "~> 1.0"
Usage
import gleam/erlang/process
import topics
pub fn main() {
let assert Ok(ps) = topics.start()
let subject = process.new_subject()
topics.subscribe(ps, "events", subject)
topics.broadcast(ps, "events", "something happened")
let assert Ok(msg) = process.receive(subject, within: 1000)
topics.unsubscribe(ps, "events", subject)
// Inspect topics and subscriber counts
let topics = topics.list_topics(ps)
let count = topics.subscriber_count(ps, "events")
}
Generic messages
The topics package is generic over the message type. The type is inferred from usage:
// PubSub(Int) — inferred from the broadcast call
let assert Ok(int_ps) = topics.start()
let sub = process.new_subject()
topics.subscribe(int_ps, "counts", sub)
topics.broadcast(int_ps, "counts", 42)
Custom types work the same way:
type Event {
UserJoined(name: String)
UserLeft(name: String)
}
let assert Ok(event_ps) = topics.start()
let sub = process.new_subject()
topics.subscribe(event_ps, "lobby", sub)
topics.broadcast(event_ps, "lobby", UserJoined("Alice"))
Supervisor integration
Use start_linked to embed the topics server in a supervision tree:
import gleam/otp/static_supervisor as supervisor
pub fn main() {
let assert Ok(_) =
supervisor.new()
|> supervisor.add(topics.start_linked)
|> supervisor.start()
// topics is now supervised and will be restarted on failure
}
Known limitations
- Single-node only. No support for distributed Erlang clusters. All publishers and subscribers must run on the same BEAM node.
- Sequential broadcast. Messages are delivered to subscribers sequentially within each broadcast call. Latency grows linearly with subscriber count.
- Fire-and-forget delivery. No delivery acknowledgement or guarantee. If a subscriber’s mailbox is full, messages may be silently dropped.
License
Apache-2.0