glyn/registry
Glyn Registry - Selector-Based Type-Safe Process Registry
This module provides a selector-based wrapper around Erlang’s syn process registry,
enabling distributed service discovery and direct process communication with
runtime type safety through dynamic decoding.
Multi-Channel Actor Integration Pattern
The registry seamlessly composes with other message channels using selectors:
import gleam/dynamic.{type Dynamic}
import gleam/dynamic/decode
import gleam/erlang/atom
import gleam/erlang/process.{type Subject}
import gleam/otp/actor
import glyn/registry
import glyn/pubsub
// Define your message types
pub type ServiceMessage {
ProcessOrder(id: String, reply_with: Subject(Bool))
GetStatus(reply_with: Subject(String))
Shutdown
}
pub type SystemEvent {
ServiceStarted(name: String)
ServiceStopped(name: String)
}
pub type ActorMessage {
DirectCommand(String) // Direct commands
RegistryMessage(ServiceMessage) // Registry messages (decoded)
PubSubEvent(SystemEvent) // PubSub events
}
// Create decoders for your message types
fn expect_atom(expected: String) -> decode.Decoder(atom.Atom) {
use value <- decode.then(atom.decoder())
case atom.to_string(value) == expected {
True -> decode.success(value)
False -> decode.failure(value, "Expected atom: " <> expected)
}
}
fn service_message_decoder() -> decode.Decoder(ServiceMessage) {
decode.one_of(
{
use _ <- decode.field(0, expect_atom("shutdown"))
decode.success(Shutdown)
},
or: [
{
use _ <- decode.field(0, expect_atom("get_status"))
use reply_with <- decode.field(1, subject_decoder())
decode.success(GetStatus(reply_with))
},
// Add other variants as needed
]
)
}
fn start_multi_channel_actor() {
actor.new_with_initialiser(5000, fn(_) {
let command_subject = process.new_subject()
// Create base selector for direct commands
let base_selector =
process.new_selector()
|> process.select_map(command_subject, DirectCommand)
// Add registry channel
let user_registry = registry.new(
scope: "user_services",
decoder: service_message_decoder(),
error_default: Shutdown
)
let assert Ok(registry_selector) = registry.register(
user_registry,
"order_processor",
"v1.0"
)
let with_registry = base_selector
|> process.merge_selector(
process.map_selector(registry_selector, RegistryMessage)
)
// Add pubsub channel for system events
let system_pubsub = pubsub.new(
scope: "system_events",
decoder: system_event_decoder(),
error_default: ServiceStarted("unknown")
)
let pubsub_selector = pubsub.subscribe(system_pubsub, "services")
let final_selector = with_registry
|> process.merge_selector(
process.map_selector(pubsub_selector, PubSubEvent)
)
actor.initialised(initial_state)
|> actor.selecting(final_selector)
|> actor.returning(command_subject)
|> Ok
})
}
// Send messages to registered services
let user_registry = registry.new(
scope: "user_services",
decoder: service_message_decoder(),
error_default: Shutdown
)
// Send a message
let assert Ok(_) = registry.send(
user_registry,
"order_processor",
ProcessOrder("order-123", reply_subject)
)
// Make a call and wait for reply
let assert Ok(status) = registry.call(
user_registry,
"order_processor",
waiting: 5000,
sending: GetStatus(_)
)
Types
Type-safe process registry with dynamic decoding
pub opaque type Registry(message, metadata)
Registration errors
pub type RegistryError {
Timeout
ProcessNotFound(name: String)
RegistrationFailed(reason: String)
UnregistrationFailed(reason: String)
}
Constructors
-
Timeout -
ProcessNotFound(name: String) -
RegistrationFailed(reason: String) -
UnregistrationFailed(reason: String)
Values
pub fn call(
registry: Registry(message, metadata),
actor_name: String,
waiting timeout: Int,
sending message_fn: fn(process.Subject(reply)) -> message,
) -> Result(reply, RegistryError)
Call a registered process and wait for a reply, similar to actor.call
pub fn new(
scope scope: String,
decoder decoder: decode.Decoder(message),
error_default error_default: message,
) -> Registry(message, metadata)
Create a new Registry system for a given scope with dynamic decoding
pub fn register(
registry registry: Registry(message, metadata),
actor_name actor_name: String,
metadata metadata: metadata,
) -> Result(process.Selector(message), RegistryError)
Register a process with a name and return a selector for receiving messages Creates an internal Subject(Dynamic) and uses select_map for type safety
pub fn send(
registry: Registry(message, metadata),
actor_name: String,
message: message,
) -> Result(Nil, RegistryError)
Send a message to a registered process using the stored dynamic subject
pub fn unregister(
registry: Registry(message, metadata),
actor_name: String,
) -> Result(Nil, RegistryError)
Unregister a process by name
pub fn whereis(
registry: Registry(message, metadata),
actor_name: String,
) -> Result(#(process.Pid, metadata), RegistryError)
Look up a registered process and return PID with metadata