distribute
distribute: Typed messaging for the BEAM.
This library provides a safety layer over Erlang’s native distribution. It focuses on hardening the Node Boundary: the physical edge where typed Gleam values become raw binary terms and vice versa.
The Typed Boundary Concept
In standard Erlang distribution, messages cross the wire as raw terms
without type information. distribute forces you to define a protocol
using TypedName and Codec before any data is sent.
- At the Sender: messages are encoded, checked against payload limits, and transmitted via BEAM distribution.
- At the Receiver: binaries have their size verified before decoding. If they exceed limits, they are rejected. If they are valid, they are decoded into typed Gleam values.
This architecture ensures that an actor’s mailbox is never flooded with unparseable or oversized data, and the compiler can prevent protocol mismatches at the call site.
Core Philosophy
- Fail-Fast over Retry: every failure (Timeout, TargetDown, DecodeError) is surfaced immediately. Managing retries and circuit breaking is left to the application layer.
- Memory First: payload limits are enforced at the I/O boundary to protect nodes from OOM attacks or bugs.
- OTP Native: every part of the library (supervisors, monitors, subjects) follows standard BEAM OTP patterns.
- Zero Cost Read: configuration is stored in Erlang’s
persistent_termfor O(1) reads with zero heap allocation.
Typical Usage
// 1. Configure once at application startup
let assert Ok(Nil) = distribute.configure(config.default())
// 2. Define a protocol (Name + Codec)
let counter = distribute.named("counter", codec.int())
// 3. Start a globally registered, supervised singleton
let assert Ok(pid) = distribute.start_supervised(counter, 0, handler)
// 4. Call from any node. Monitor-based TargetDown detection,
default timeout from config
let assert Ok(val) = distribute.call(gs, Get, codec.int_decoder())
Default vs explicit timeout
Every long-running operation has two shapes:
- The default form reads from
config.get(). This is the path you want 90% of the time. A singleconfigure(...)at boot tunes the whole surface (call,receive,start_actor,start_registered,start_supervised,pool,child_spec,call_isolated). - The
_with_timeoutform is the explicit override. Pick it only when the request has a hard deadline that diverges fromconfig.default_*.
Error handling without extra imports
Every public error type (and its *_to_string formatter) is
re-exported here. Pattern-matching on CallError, RegisterError,
LookupError, etc., requires only import distribute.
Reply-Subject helpers live in distribute/receiver
Handlers that answer a call typically need receiver.receive_typed
or receiver.selecting_typed to operate on the raw reply-Subject.
These are intra-handler primitives: import the module directly.
import distribute/receiver
case receiver.receive_typed(reply_to, codec.int_decoder(), 1000) {
Ok(value) -> ...
Error(receiver.ReceiveTimeout) -> ...
Error(receiver.DecodeError(_)) -> ...
}
API surface, by concern
- Boot & config:
configure,get_config,version,Config,ConfigError,config_error_to_string. - Node lifecycle:
start_node,connect,nodes,self_node,is_distributed,has_peers,health,ClusterHealth,NodeStartError,ConnectError,node_start_error_to_string,connect_error_to_string. - Cluster events:
start_monitor,subscribe,unsubscribe,ClusterEvent,MonitorMessage. - Protocol:
named,new_subject,register,lookup,unregister,unregister_typed,TypedName,GlobalSubject,RegisterError,UnregisterError,LookupError,register_error_to_string,unregister_error_to_string,lookup_error_to_string. - Actor lifecycle:
start_actor/start_actor_with_timeout/start_actor_observed,start_registered/start_registered_with_timeout,start_supervised/start_supervised_with_timeout,pool/pool_with_timeout,child_spec/child_spec_with_timeout,HandlerStep,ActorStartError,StartRegisteredError,actor_start_error_to_string,start_registered_error_to_string. - Messaging:
send,reply,receive/receive_with_timeout,call/call_with_timeout,call_isolated/call_isolated_with_timeout,SendError,CallError,ReceiveError,EncodeError,DecodeError,send_error_to_string,call_error_to_string,receive_error_to_string,encode_error_to_string,decode_error_to_string.
Most users only need import distribute. Low-level modules
(distribute/actor, distribute/global, distribute/registry,
distribute/codec, distribute/receiver, distribute/cluster,
distribute/cluster_monitor, distribute/codec/composite,
distribute/codec/tagged, distribute/config) remain available for
advanced cases not covered by the facade. e.g. whereis,
register_pid, register_typed, from_pid, from_subject,
lookup_with_timeout, lookup_async, start_registered_observed,
receive_typed, selecting_typed.
Types
pub type ActorStartError =
actor.StartError
pub type CallError =
global.CallError
pub type ClusterEvent =
cluster_monitor.ClusterEvent
pub type ClusterHealth =
cluster.ClusterHealth
pub type Config =
config.Config
pub type ConfigError =
config.ConfigError
pub type ConnectError =
cluster.ConnectError
pub type DecodeError =
codec.DecodeError
pub type EncodeError =
codec.EncodeError
pub type GlobalSubject(msg) =
global.GlobalSubject(msg)
pub type HandlerStep(state) =
receiver.HandlerStep(state)
pub type LookupError =
registry.LookupError
pub type MonitorMessage =
cluster_monitor.Message
pub type NodeStartError =
cluster.StartError
pub type ReceiveError =
receiver.ReceiveError
pub type RegisterError =
registry.RegisterError
pub type SendError =
global.SendError
pub type StartRegisteredError =
actor.StartRegisteredError
pub type TelemetryEvent =
telemetry.Event
pub type TelemetrySink =
fn(telemetry.Event) -> Nil
pub type TypedName(msg) =
registry.TypedName(msg)
pub type UnregisterError =
registry.UnregisterError
Values
pub fn actor_start_error_to_string(
err: actor.StartError,
) -> String
Render a gleam_otp/actor.StartError as a human-readable string.
pub fn call(
target: global.GlobalSubject(req),
make_request: fn(process.Subject(BitArray)) -> req,
response_decoder: fn(BitArray) -> Result(
resp,
codec.DecodeError,
),
) -> Result(resp, global.CallError)
Synchronous request/response with monitor-based TargetDown detection.
Uses config.get().default_call_timeout_ms. Use call_with_timeout/4
for an explicit timeout.
Late-reply caveat: choose call_isolated for long-running callers
call is the cheap default. It is safe for short-lived callers
(CLI tools, request handlers, scripts) whose process exits shortly
after the call returns: orphan late-replies die with the process.
It is not the right choice for long-lived processes (OTP
actors, supervisors, manager loops) that issue many calls under
sustained timeouts. A reply that arrives after call returns
Error(Timeout) cannot be evicted from the caller’s mailbox by
the BEAM (no erlang:alias/0-aware Subject layout in the current
gleam_erlang); selective receive scans every orphan on every
subsequent process.receive, and tens of thousands of orphans
quietly degrade the caller’s throughput.
For that shape, prefer call_isolated/3: it runs each call inside
a short-lived unlinked proxy process whose mailbox is reaped on
exit. See global.call/4 and docs/safety_and_limits.md for the
full design rationale.
pub fn call_error_to_string(err: global.CallError) -> String
pub fn call_isolated(
target: global.GlobalSubject(req),
make_request: fn(process.Subject(BitArray)) -> req,
response_decoder: fn(BitArray) -> Result(
resp,
codec.DecodeError,
),
) -> Result(resp, global.CallError)
Mailbox-safe variant of call using
config.get().default_call_timeout_ms. Each invocation runs inside
a short-lived unlinked proxy process so orphan late-replies die
with the proxy instead of polluting the caller’s mailbox.
Recommended for long-running callers issuing many RPCs under
sustained timeouts. See global.call_isolated for the full design
rationale.
pub fn call_isolated_with_timeout(
target: global.GlobalSubject(req),
make_request: fn(process.Subject(BitArray)) -> req,
response_decoder: fn(BitArray) -> Result(
resp,
codec.DecodeError,
),
timeout_ms: Int,
) -> Result(resp, global.CallError)
Like call_isolated, with an explicit timeout.
pub fn call_with_timeout(
target: global.GlobalSubject(req),
make_request: fn(process.Subject(BitArray)) -> req,
response_decoder: fn(BitArray) -> Result(
resp,
codec.DecodeError,
),
timeout_ms: Int,
) -> Result(resp, global.CallError)
Like call, with an explicit timeout. Inherits the same
late-reply caveat. See call/3.
pub fn child_spec(
typed_name: registry.TypedName(msg),
initial_state: state,
handler: fn(msg, state) -> receiver.HandlerStep(state),
) -> supervision.ChildSpecification(global.GlobalSubject(msg))
OTP child spec using config.get().default_init_timeout_ms.
pub fn child_spec_with_timeout(
typed_name: registry.TypedName(msg),
initial_state: state,
handler: fn(msg, state) -> receiver.HandlerStep(state),
init_timeout_ms: Int,
) -> supervision.ChildSpecification(global.GlobalSubject(msg))
Like child_spec, with an explicit init timeout.
pub fn config_error_to_string(err: config.ConfigError) -> String
pub fn configure(
cfg: config.Config,
) -> Result(Nil, config.ConfigError)
Set global runtime configuration. Call once at application startup.
pub fn connect(node: String) -> Result(Nil, cluster.ConnectError)
pub fn connect_error_to_string(
err: cluster.ConnectError,
) -> String
pub fn decode_error_to_string(err: codec.DecodeError) -> String
pub fn encode_error_to_string(err: codec.EncodeError) -> String
pub fn get_config() -> config.Config
Read the current global configuration (or defaults if never configured).
pub fn has_peers() -> Bool
Whether this node has at least one connected peer. Not a health
check. A single-node deployment is operationally fine and will
return False here. Use this only to gate cluster-wide operations.
pub fn health() -> cluster.ClusterHealth
Full cluster health snapshot with parallel pings.
pub fn install_telemetry(sink: fn(telemetry.Event) -> Nil) -> Nil
Install (or replace) the global telemetry sink for observability.
This single opt-in sink receives all load-bearing events from the library
(registry, atom budget, payload limits, codec failures, timeouts).
See distribute/telemetry for the full semantics and event structure.
pub fn is_distributed() -> Bool
Whether this node is running BEAM distribution (via erlang:is_alive/0).
pub fn lookup(
typed_name: registry.TypedName(msg),
) -> Result(global.GlobalSubject(msg), Nil)
Look up a GlobalSubject by its TypedName. For polling variants
(blocking and async) see registry.lookup_with_timeout /
registry.lookup_async.
pub fn lookup_error_to_string(
err: registry.LookupError,
) -> String
pub fn named(
name: String,
c: codec.Codec(msg),
) -> registry.TypedName(msg)
Create a TypedName from a bundled Codec.
let counter = distribute.named("counter", codec.int())
pub fn new_subject(
c: codec.Codec(msg),
) -> global.GlobalSubject(msg)
Create a new GlobalSubject owned by the current process, from a
bundled Codec. For separate encoder/decoder, drop down to
global.new directly.
pub fn node_start_error_to_string(
err: cluster.StartError,
) -> String
pub fn pool(
typed_name: registry.TypedName(msg),
size: Int,
initial_state: state,
handler: fn(msg, state) -> receiver.HandlerStep(state),
) -> Result(process.Pid, actor.StartError)
Start N supervised actors using config.get().default_init_timeout_ms.
pub fn pool_with_timeout(
typed_name: registry.TypedName(msg),
size: Int,
initial_state: state,
handler: fn(msg, state) -> receiver.HandlerStep(state),
init_timeout_ms: Int,
) -> Result(process.Pid, actor.StartError)
Like pool, with an explicit init timeout.
pub fn receive(
subject: global.GlobalSubject(msg),
) -> Result(msg, codec.DecodeError)
Receive a typed message using config.get().default_call_timeout_ms.
Use receive_with_timeout/2 for an explicit timeout.
pub fn receive_error_to_string(
err: receiver.ReceiveError,
) -> String
pub fn receive_with_timeout(
subject: global.GlobalSubject(msg),
timeout_ms: Int,
) -> Result(msg, codec.DecodeError)
Like receive, with an explicit timeout.
pub fn register(
typed_name: registry.TypedName(msg),
subject: global.GlobalSubject(msg),
) -> Result(Nil, registry.RegisterError)
Register a GlobalSubject under its TypedName. The typed pair is
the recommended path; for raw-PID registration import
distribute/registry and call registry.register/2 directly.
pub fn register_error_to_string(
err: registry.RegisterError,
) -> String
pub fn reply(
reply_to: process.Subject(BitArray),
response: resp,
encoder: fn(resp) -> Result(BitArray, codec.EncodeError),
) -> Result(Nil, global.SendError)
Send a response through a reply subject. Used by handlers to answer a call.
pub fn send(
subject: global.GlobalSubject(msg),
message: msg,
) -> Result(Nil, global.SendError)
pub fn send_error_to_string(err: global.SendError) -> String
pub fn start_actor(
typed_name: registry.TypedName(msg),
initial_state: state,
handler: fn(msg, state) -> receiver.HandlerStep(state),
) -> Result(global.GlobalSubject(msg), actor.StartError)
Start a named actor using config.get().default_init_timeout_ms.
Use start_actor_with_timeout/4 for an explicit timeout.
pub fn start_actor_observed(
typed_name: registry.TypedName(msg),
initial_state: state,
handler: fn(msg, state) -> receiver.HandlerStep(state),
on_decode_error: fn(codec.DecodeError) -> Nil,
) -> Result(global.GlobalSubject(msg), actor.StartError)
Start a named actor with a decode-error callback. Useful for
logging or metering malformed messages across nodes (e.g. during
rolling deploys with mismatched codec versions). Uses
config.get().default_init_timeout_ms. If you need a custom
init timeout, drop down to actor.start_observed directly.
pub fn start_actor_with_timeout(
typed_name: registry.TypedName(msg),
initial_state: state,
handler: fn(msg, state) -> receiver.HandlerStep(state),
init_timeout_ms: Int,
) -> Result(global.GlobalSubject(msg), actor.StartError)
Start a named actor with an explicit init timeout.
pub fn start_monitor() -> Result(
process.Subject(cluster_monitor.Message),
actor.StartError,
)
Start the cluster monitor actor. It listens for Erlang node events and broadcasts them to all Gleam subscribers.
pub fn start_node(
name: String,
cookie: String,
) -> Result(Nil, cluster.StartError)
Start a distributed BEAM node.
name must contain @ (e.g. "myapp@127.0.0.1").
cookie must be [a-zA-Z0-9_-]+ and 1..255 bytes (validated by FFI).
pub fn start_registered(
typed_name: registry.TypedName(msg),
initial_state: state,
handler: fn(msg, state) -> receiver.HandlerStep(state),
) -> Result(global.GlobalSubject(msg), actor.StartRegisteredError)
Start an actor and register it globally in one step. Uses
config.get().default_init_timeout_ms. Use
start_registered_with_timeout/4 for an explicit timeout, or
actor.start_registered_observed for the decode-error hook variant.
pub fn start_registered_error_to_string(
err: actor.StartRegisteredError,
) -> String
pub fn start_registered_with_timeout(
typed_name: registry.TypedName(msg),
initial_state: state,
handler: fn(msg, state) -> receiver.HandlerStep(state),
init_timeout_ms: Int,
) -> Result(global.GlobalSubject(msg), actor.StartRegisteredError)
Like start_registered, with an explicit init timeout.
pub fn start_supervised(
typed_name: registry.TypedName(msg),
initial_state: state,
handler: fn(msg, state) -> receiver.HandlerStep(state),
) -> Result(process.Pid, actor.StartError)
Start a supervised actor using config.get().default_init_timeout_ms.
pub fn start_supervised_with_timeout(
typed_name: registry.TypedName(msg),
initial_state: state,
handler: fn(msg, state) -> receiver.HandlerStep(state),
init_timeout_ms: Int,
) -> Result(process.Pid, actor.StartError)
Like start_supervised, with an explicit init timeout.
pub fn subscribe(
monitor: process.Subject(cluster_monitor.Message),
listener: process.Subject(cluster_monitor.ClusterEvent),
) -> Nil
pub fn unregister(
name: String,
) -> Result(Nil, registry.UnregisterError)
Unregister a globally registered name. Idempotent cleanup paths can
let _ = unregister(name).
pub fn unregister_error_to_string(
err: registry.UnregisterError,
) -> String
pub fn unregister_typed(
typed_name: registry.TypedName(msg),
) -> Result(Nil, registry.UnregisterError)
Type-safe sibling of unregister/1: pulls the name string from the
TypedName the caller already holds. Recommended for graceful
shutdown paths so the cleanup site never has to hardcode the name.
pub fn unsubscribe(
monitor: process.Subject(cluster_monitor.Message),
listener: process.Subject(cluster_monitor.ClusterEvent),
) -> Nil