glyn/registry

Glyn Registry - Selector-Based Type-Safe Process Registry

This module provides a selector-based wrapper around Erlang’s syn process registry, enabling distributed service discovery and direct process communication with runtime type safety through dynamic decoding.

Multi-Channel Actor Integration Pattern

The registry seamlessly composes with other message channels using selectors:

import gleam/dynamic.{type Dynamic}
import gleam/dynamic/decode
import gleam/erlang/atom
import gleam/erlang/process.{type Subject}
import gleam/otp/actor
import glyn/registry
import glyn/pubsub

// Define your message types
pub type ServiceMessage {
  ProcessOrder(id: String, reply_with: Subject(Bool))
  GetStatus(reply_with: Subject(String))
  Shutdown
}

pub type SystemEvent {
  ServiceStarted(name: String)
  ServiceStopped(name: String)
}

pub type ActorMessage {
  DirectCommand(String)             // Direct commands
  RegistryMessage(ServiceMessage)   // Registry messages (decoded)
  PubSubEvent(SystemEvent)          // PubSub events
}

// Create decoders for your message types
fn expect_atom(expected: String) -> decode.Decoder(atom.Atom) {
  use value <- decode.then(atom.decoder())
  case atom.to_string(value) == expected {
    True -> decode.success(value)
    False -> decode.failure(value, "Expected atom: " <> expected)
  }
}

fn service_message_decoder() -> decode.Decoder(ServiceMessage) {
  decode.one_of(
    {
      use _ <- decode.field(0, expect_atom("shutdown"))
      decode.success(Shutdown)
    },
    or: [
      {
        use _ <- decode.field(0, expect_atom("get_status"))
        use reply_with <- decode.field(1, subject_decoder())
        decode.success(GetStatus(reply_with))
      },
      // Add other variants as needed
    ]
  )
}

fn start_multi_channel_actor() {
  actor.new_with_initialiser(5000, fn(_) {
    let command_subject = process.new_subject()

    // Create base selector for direct commands
    let base_selector =
      process.new_selector()
      |> process.select_map(command_subject, DirectCommand)

    // Add registry channel
    let user_registry = registry.new(
      scope: "user_services",
      decoder: service_message_decoder(),
      error_default: Shutdown
    )
    let assert Ok(registry_selector) = registry.register(
      user_registry,
      "order_processor",
      "v1.0"
    )
    let with_registry = base_selector
      |> process.merge_selector(
        process.map_selector(registry_selector, RegistryMessage)
      )

    // Add pubsub channel for system events
    let system_pubsub = pubsub.new(
      scope: "system_events",
      decoder: system_event_decoder(),
      error_default: ServiceStarted("unknown")
    )
    let pubsub_selector = pubsub.subscribe(system_pubsub, "services")
    let final_selector = with_registry
      |> process.merge_selector(
        process.map_selector(pubsub_selector, PubSubEvent)
      )

    actor.initialised(initial_state)
    |> actor.selecting(final_selector)
    |> actor.returning(command_subject)
    |> Ok
  })
}

// Send messages to registered services
let user_registry = registry.new(
  scope: "user_services",
  decoder: service_message_decoder(),
  error_default: Shutdown
)

// Send a message
let assert Ok(_) = registry.send(
  user_registry,
  "order_processor",
  ProcessOrder("order-123", reply_subject)
)

// Make a call and wait for reply
let assert Ok(status) = registry.call(
  user_registry,
  "order_processor",
  waiting: 5000,
  sending: GetStatus(_)
)

Types

Type-safe process registry with dynamic decoding

pub opaque type Registry(message, metadata)

Registration errors

pub type RegistryError {
  Timeout
  ProcessNotFound(name: String)
  RegistrationFailed(reason: String)
  UnregistrationFailed(reason: String)
}

Constructors

  • Timeout
  • ProcessNotFound(name: String)
  • RegistrationFailed(reason: String)
  • UnregistrationFailed(reason: String)

Values

pub fn call(
  registry: Registry(message, metadata),
  actor_name: String,
  waiting timeout: Int,
  sending message_fn: fn(process.Subject(reply)) -> message,
) -> Result(reply, RegistryError)

Call a registered process and wait for a reply, similar to actor.call

pub fn new(
  scope scope: String,
  decoder decoder: decode.Decoder(message),
  error_default error_default: message,
) -> Registry(message, metadata)

Create a new Registry system for a given scope with dynamic decoding

pub fn register(
  registry registry: Registry(message, metadata),
  actor_name actor_name: String,
  metadata metadata: metadata,
) -> Result(process.Selector(message), RegistryError)

Register a process with a name and return a selector for receiving messages Creates an internal Subject(Dynamic) and uses select_map for type safety

pub fn send(
  registry: Registry(message, metadata),
  actor_name: String,
  message: message,
) -> Result(Nil, RegistryError)

Send a message to a registered process using the stored dynamic subject

pub fn unregister(
  registry: Registry(message, metadata),
  actor_name: String,
) -> Result(Nil, RegistryError)

Unregister a process by name

pub fn whereis(
  registry: Registry(message, metadata),
  actor_name: String,
) -> Result(#(process.Pid, metadata), RegistryError)

Look up a registered process and return PID with metadata

Search Document