distribute

distribute logo

Package Version Hex Docs

Distribute brings the full power of Erlang’s distributed computing to Gleam, with a type-safe and gleam_otp-integrated API.

While Gleam runs on the BEAM, accessing distributed primitives (like connecting nodes, global registration, or RPC) traditionally required dealing with untyped atoms and unsafe Erlang terms. Distribute v2.0 solves this by providing:

Type-safe messaging using binary codecs (Encoder(a), Decoder(a)) and gleam/erlang/process.Subject(BitArray)
Full gleam_otp integration — integrates with actors and selectors
Explicit error handling — all operations return typed Result values
Composable codecs — built-in support for primitives, Option, Result, tuples, and custom types
Production-ready — comprehensive error handling, deprecated legacy APIs

Note: Use the _typed variants of all functions (e.g., send_global_typed, broadcast_typed) for full type safety. Legacy untyped functions are deprecated and will be removed in v3.0.

Features

Core Distributed Primitives

Type-Safe API (v2.0)

Advanced Features

Installation

# gleam.toml
[dependencies]
gleam_stdlib = ">= 0.43.0"
gleam_erlang = ">= 0.5.0"
gleam_otp = ">= 0.1.0"
distribute = "~> 2.0"
gleam add distribute 

Documentation

Quick Start (Type-Safe API)

1. Start a Distributed Node

import distribute/node_builder
import gleam/io

pub fn main() {
  // Start this node with builder pattern
  let assert Ok(_) = node_builder.new()
    |> node_builder.with_name("app@127.0.0.1")
    |> node_builder.with_cookie("secret")
    |> node_builder.start()
  
  io.println("Node started!")
}

2. Connect to Another Node

import distribute/cluster
import gleam/io
import gleam/string

pub fn connect_peer() {
  case cluster.connect("other@127.0.0.1") {
    Ok(_) -> io.println("Connected to peer!")
    Error(err) -> io.println("Connection failed: " <> string.inspect(err))
  }
}

3. Register and Lookup Processes Globally (Type-Safe)

import distribute/codec
import distribute/global
import distribute/messaging
import distribute/receiver
import distribute/registry
import gleam/erlang/process

pub fn register_and_lookup() {
  // Create a type-safe global subject with encoder/decoder
  let encoder = codec.string_encoder()
  let decoder = codec.string_decoder()
  let global = global.new(encoder, decoder)
  
  // Register it globally
  let _ = registry.register_typed("my_service", global.subject(global))

  // From another node/process: look up the service type-safely
  let assert Ok(remote_service) = registry.whereis_global(
    "my_service",
    encoder,
    decoder
  )
  
  // Send a typed message through GlobalSubject
  let _ = global.send(remote_service, "Hello, world!")
  
  // Or use messaging API directly
  let _ = messaging.send_global_typed(
    "my_service", 
    "Hello, world!",
    encoder
  )
  
  // Receive typed messages
  let assert Ok(msg) = receiver.receive_typed(
    global.subject(global),
    decoder,
    1000
  )
}

4. Process Groups (Type-Safe)

import distribute/codec
import distribute/codec/builder
import distribute/global
import distribute/groups

pub type Task {
  Task(name: String, id: Int)
}

pub fn use_groups() {
  // Build a codec for Task
  let #(task_encoder, task_decoder) = builder.custom2(
    codec.string_encoder(),
    codec.int_encoder(),
    codec.string_decoder(),
    codec.int_decoder(),
    Task,
    fn(t) { #(t.name, t.id) },
  )
  
  // Create a global subject for group communication
  let global = global.new(task_encoder, task_decoder)

  // Join a group named "workers"
  let _ = groups.join_typed("workers", global.subject(global))

  // Broadcast a typed message to all members
  let _ = global.send(global, Task("process_data", 42))

  // Get members list
  let members = groups.members_typed("workers")
}

5. SWIM Membership Service

import distribute/cluster/membership

pub fn use_membership() {
  // Start the background membership service (probe every 500ms)
  membership.start_service(500)

  // Get a list of alive nodes
  let alive = membership.alive()

  // Get nodes with full status details
  let nodes = membership.members_with_status()
  // Returns: [#("node@host", Alive, 0), ...]

  // Get current leader (lexicographically largest alive node)
  let leader = membership.current_leader()

  // Stop the service when done
  membership.stop_service()
}

6. Leader Election (Raft-lite)

import distribute/election/raft_lite
import gleam/io

pub fn elect_leader() {
  case raft_lite.elect() {
    raft_lite.Leader(name) -> io.println("Current Leader: " <> name)
    raft_lite.NoLeader -> io.println("No leader available yet")
  }
}

7. Remote Procedure Calls

import distribute/remote_call
import gleam/io
import gleam/string

pub fn call_remote() {
  // Call erlang:node() on a remote node
  case remote_call.call("other@host", "erlang", "node", [], []) {
    Ok(result) -> io.println("Remote node name: " <> string.inspect(result))
    Error(remote_call.RpcBadRpc(reason)) -> io.println("RPC failed: " <> reason)
    Error(_) -> io.println("RPC error")
  }
}

Module Reference

ModuleDescription
distribute/clusterNode management: start, connect, ping, list nodes
distribute/registryGlobal process registration with type-safe subjects
distribute/messagingType-safe cross-node messaging with codecs
distribute/groupsProcess groups with type-safe broadcast
distribute/monitorMonitor processes and nodes
distribute/remote_callType-safe RPC to remote nodes
distribute/codecBinary encoding/decoding for primitives and composite types
distribute/codec/builderHelpers for building custom type codecs
distribute/codec/taggedTag and version validation for protocol safety
distribute/globalType-safe global subjects with integrated codecs
distribute/receiverType-safe message receiving with codecs
distribute/sugarConvenience helpers for common patterns
distribute/actorSimple actor wrappers
distribute/cluster/membershipSWIM-like membership with gossip and failure detection
distribute/cluster/gossipGossip protocol for membership state propagation
distribute/cluster/healthHealth checks for nodes and cluster
distribute/election/raft_liteLightweight leader election with term-based voting

Note: Legacy untyped functions (send_global, broadcast, call) are deprecated and will be removed in v3.0. Use the _typed variants with codecs for full type safety.

Running Tests

gleam test

Integration Tests

You can run a multi-node SWIM integration test using the provided script:

./examples/two_nodes/swim_integration.sh

This script starts 3 local Erlang nodes, runs the membership service, and verifies gossip convergence.

Note: The legacy wrapper examples/two_nodes/run_swim_integration.sh has been removed — run examples/two_nodes/swim_integration.sh directly.

Note: Scripts under examples/ are manual integration demos. They rely on local node naming (-sname + hostname) and are not intended to be a CI hard requirement.

Architecture

src/
├── distribute.gleam        # Top-level module
├── distribute/
│   ├── cluster.gleam       # Node management
│   ├── registry.gleam      # Global registry
│   ├── messaging.gleam     # Cross-node messaging
│   ├── groups.gleam        # Process groups
│   ├── monitor.gleam       # Process/node monitoring
│   ├── remote_call.gleam   # RPC
│   ├── cluster/
│   │   ├── membership.gleam # SWIM-like membership
│   │   ├── gossip.gleam     # Gossip protocol
│   │   └── health.gleam     # Health checks
│   └── election/
│       └── raft_lite.gleam  # Leader election
└── *_ffi.erl               # Erlang FFI files (in src root)

Design Philosophy

  1. No magic — Everything is explicit.
  2. Zero custom runtime — Only wraps standard BEAM features.
  3. Small but complete APIs — Just what you need for clustering.
  4. Type-safe — Gleam’s type system prevents common errors.
  5. Compatible with gleam/otp — Works alongside standard OTP patterns.

Safety

Settings

This library exposes a small settings API which controls behaviour that impacts safety and logging.

Example:

import distribute/settings

pub fn main() {
  // Secure default — don't allow uncontrolled atom creation
  settings.set_allow_atom_creation(False)

  // Prefer crypto-based correlation ids
  settings.set_use_crypto_ids(True)
}

Logging & Correlation IDs

This library provides a structured logging helper log with metadata and correlation ids.

Example:

import distribute/log
import distribute/settings

pub fn main() {
  settings.set_use_crypto_ids(True)
  log.set_backend("erlang_logger")
  let id = log.generate_correlation_id()
  log.info_with_correlation("Starting operation", [#("user","alice")], id)
}

RPC Timeouts & Error Handling

Remote calls use a default timeout of 5000ms. Use remote_call.call_with_timeout to customise the timeout for long-running calls.

Example with a custom timeout:

import distribute/remote_call
import gleam/io
import gleam/string

pub fn call_remote_with_timeout() {
  // Call erlang:node() on a remote node with a 10s timeout
  case remote_call.call_with_timeout("other@host", "erlang", "node", [], [], 10_000) {
    Ok(result) -> io.println("Remote node name: " <> string.inspect(result))
    Error(remote_call.RpcBadRpc(reason)) -> io.println("RPC failed: " <> reason)
    Error(_) -> io.println("RPC error")
  }
}

Documentation

Examples

Contributing

Contributions welcome! Please open an issue or PR on GitHub.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Search Document