topics

Topic-based publish/subscribe for Gleam with automatic dead subscriber cleanup.

Messages can be of any type — each PubSub(msg) instance is isolated by the message type, enforced at compile time.

Installation

gleam add topics
[dependencies]
topics = "~> 1.0"

Usage

import gleam/erlang/process
import topics

pub fn main() {
  let assert Ok(ps) = topics.start()

  let subject = process.new_subject()
  topics.subscribe(ps, "events", subject)
  topics.broadcast(ps, "events", "something happened")

  let assert Ok(msg) = process.receive(subject, within: 1000)

  topics.unsubscribe(ps, "events", subject)

  // Inspect topics and subscriber counts
  let topics = topics.list_topics(ps)
  let count = topics.subscriber_count(ps, "events")
}

Generic messages

The topics package is generic over the message type. The type is inferred from usage:

// PubSub(Int) — inferred from the broadcast call
let assert Ok(int_ps) = topics.start()
let sub = process.new_subject()
topics.subscribe(int_ps, "counts", sub)
topics.broadcast(int_ps, "counts", 42)

Custom types work the same way:

type Event {
  UserJoined(name: String)
  UserLeft(name: String)
}

let assert Ok(event_ps) = topics.start()
let sub = process.new_subject()
topics.subscribe(event_ps, "lobby", sub)
topics.broadcast(event_ps, "lobby", UserJoined("Alice"))

Supervisor integration

Use start_linked to embed the topics server in a supervision tree:

import gleam/otp/static_supervisor as supervisor

pub fn main() {
  let assert Ok(_) =
    supervisor.new()
    |> supervisor.add(topics.start_linked)
    |> supervisor.start()

  // topics is now supervised and will be restarted on failure
}

Known limitations

License

Apache-2.0

Search Document