glyn/pubsub

Glyn PubSub - Selector-Based Type-Safe Event Streaming

This module provides a selector-based wrapper around Erlang’s syn PubSub system, enabling distributed event streaming and one-to-many message broadcasting with runtime type safety through dynamic decoding.

Multi-Channel Actor Integration Pattern

PubSub seamlessly composes with other message channels using selectors:

import gleam/dynamic.{type Dynamic}
import gleam/dynamic/decode
import gleam/erlang/atom
import gleam/erlang/process.{type Subject}
import gleam/otp/actor
import glyn/pubsub
import glyn/registry

// Define your event types
pub type ChatMessage {
  UserJoined(username: String)
  UserLeft(username: String)
  Message(username: String, content: String)
}

pub type MetricEvent {
  CounterIncrement(name: String, value: Int)
  GaugeUpdate(name: String, value: Float)
}

pub type ActorMessage {
  DirectCommand(String)           // Direct commands
  ChatEvent(ChatMessage)          // Chat PubSub events
  MetricEvent(MetricEvent)        // Metrics PubSub events
}

// Create decoders for your event types
fn expect_atom(expected: String) -> decode.Decoder(atom.Atom) {
  use value <- decode.then(atom.decoder())
  case atom.to_string(value) == expected {
    True -> decode.success(value)
    False -> decode.failure(value, "Expected atom: " <> expected)
  }
}

fn chat_message_decoder() -> decode.Decoder(ChatMessage) {
  decode.one_of(
    {
      use _ <- decode.field(0, expect_atom("user_joined"))
      use username <- decode.field(1, decode.string)
      decode.success(UserJoined(username))
    },
    or: [
      {
        use _ <- decode.field(0, expect_atom("message"))
        use username <- decode.field(1, decode.string)
        use content <- decode.field(2, decode.string)
        decode.success(Message(username, content))
      },
      // Add other variants as needed
    ]
  )
}

fn metric_event_decoder() -> decode.Decoder(MetricEvent) {
  decode.one_of(
    {
      use _ <- decode.field(0, expect_atom("counter_increment"))
      use name <- decode.field(1, decode.string)
      use value <- decode.field(2, decode.int)
      decode.success(CounterIncrement(name, value))
    },
    or: [
      {
        use _ <- decode.field(0, expect_atom("gauge_update"))
        use name <- decode.field(1, decode.string)
        use value <- decode.field(2, decode.float)
        decode.success(GaugeUpdate(name, value))
      },
    ]
  )
}

fn start_multi_channel_actor() {
  actor.new_with_initialiser(5000, fn(_) {
    let command_subject = process.new_subject()

    // Create base selector for direct commands
    let base_selector =
      process.new_selector()
      |> process.select_map(command_subject, DirectCommand)

    // Add chat PubSub channel
    let chat_pubsub = pubsub.new(
      scope: "chat_events",
      decoder: chat_message_decoder(),
      error_default: UserJoined("unknown")
    )
    let chat_selector = pubsub.subscribe(chat_pubsub, "general")
    let with_chat = base_selector
      |> process.merge_selector(
        process.map_selector(chat_selector, ChatEvent)
      )

    // Add metrics PubSub channel
    let metrics_pubsub = pubsub.new(
      scope: "metrics_events",
      decoder: metric_event_decoder(),
      error_default: CounterIncrement("unknown", 0)
    )
    let metrics_selector = pubsub.subscribe(metrics_pubsub, "system")
    let final_selector = with_chat
      |> process.merge_selector(
        process.map_selector(metrics_selector, MetricEvent)
      )

    actor.initialised(initial_state)
    |> actor.selecting(final_selector)
    |> actor.returning(command_subject)
    |> Ok
  })
}

// Publishing events to subscribers
let chat_pubsub = pubsub.new(
  scope: "chat_events",
  decoder: chat_message_decoder(),
  error_default: UserJoined("unknown")
)

// Publish a chat message to all subscribers in "general" channel
let assert Ok(subscriber_count) = pubsub.publish(
  chat_pubsub,
  "general",
  Message("alice", "Hello everyone!")
)

// Check how many subscribers received the message
let count = pubsub.subscriber_count(chat_pubsub, "general")

Types

Type-safe PubSub with dynamic decoding

pub opaque type PubSub(message)
pub type PubSubError {
  PublishFailed(String)
}

Constructors

  • PublishFailed(String)

Values

pub fn new(
  scope: String,
  decoder: decode.Decoder(message),
  error_default: message,
) -> PubSub(message)

Create a new PubSub system for a given scope with dynamic decoding

pub fn publish(
  pubsub: PubSub(message),
  group: String,
  message: message,
) -> Result(Int, PubSubError)

Publish a type-safe message to all subscribers of a group

pub fn subscribe(
  pubsub pubsub: PubSub(message),
  group group: String,
) -> process.Selector(message)

Subscribe to a PubSub group and compose into a selector Creates an internal Subject(Dynamic) and uses select_map for type safety

pub fn subscriber_count(
  pubsub: PubSub(message),
  group: String,
) -> Int

Get the count of subscribers for a group

pub fn subscribers(
  pubsub: PubSub(message),
  group: String,
) -> List(process.Pid)

Get list of subscriber PIDs for a group (useful for debugging)

pub fn unsubscribe(pubsub: PubSub(message), group: String) -> Nil

Unsubscribe from a PubSub group

Search Document