distribute/actor

Types

Errors that can occur when starting an actor.

pub type ActorError {
  StartFailed(actor.StartError)
  InvalidConfiguration(String)
}

Constructors

  • StartFailed(actor.StartError)

    The underlying OTP actor failed to start.

  • InvalidConfiguration(String)

    Invalid configuration provided.

Values

pub fn child_spec_server(
  initial_state: state,
  decoder: fn(BitArray) -> Result(request, codec.DecodeError),
  handler: fn(request, state) -> receiver.Next(state),
) -> supervision.ChildSpecification(process.Subject(BitArray))

Create a supervision child spec for a request-response server.

Identical to child_spec_typed_actor but with clearer semantic intent for server patterns.

Example

supervisor.start_link(fn(children) {
  children
  |> supervision.add(actor.child_spec_server(
    Nil,
    request_decoder(),
    handle_request,
  ))
})
pub fn child_spec_typed_actor(
  initial_state: state,
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
  handler: fn(msg, state) -> receiver.Next(state),
) -> supervision.ChildSpecification(process.Subject(BitArray))

Create a supervision child spec for a typed actor.

This function returns a ChildSpecification that can be added to a gleam/otp/supervision supervisor tree. The supervisor will automatically restart the actor if it crashes.

Note: Returns Subject(BitArray) rather than GlobalSubject(msg) for OTP compatibility. Wrap with global.from_subject after supervision start if you need the typed wrapper.

Parameters

  • initial_state: The initial state for the actor
  • decoder: Decoder for messages
  • handler: Message handler function

Returns

A ChildSpecification(Subject(BitArray)) for use with supervisors.

Example

import gleam/otp/supervision.{supervisor}

pub fn start_supervised() {
  supervisor.start_link(fn(children) {
    children
    |> supervision.add(actor.child_spec_typed_actor(
      0,
      my_decoder(),
      my_handler,
    ))
  })
}
pub fn child_spec_typed_actor_typed(
  initial_state: state,
  encoder: fn(msg) -> Result(BitArray, codec.EncodeError),
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
  handler: fn(msg, state) -> receiver.Next(state),
) -> supervision.ChildSpecification(global.GlobalSubject(msg))

Create a typed child spec that returns GlobalSubject(msg) as the child data. This helper is supervision-friendly and returns a ChildSpecification with the typed data so supervisors can access the typed subject directly.

pub fn pool_supervisor(
  pool_size: Int,
  initial_state: state,
  encoder: fn(msg) -> Result(BitArray, codec.EncodeError),
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
  handler: fn(msg, state) -> receiver.Next(state),
) -> Result(
  #(process.Pid, List(global.GlobalSubject(msg))),
  actor.StartError,
)

Create a worker pool with N typed actors under a supervisor.

This helper creates a pool of identical worker actors, all supervised under a single supervisor. The pool can be used for load balancing, parallel processing, or handling concurrent requests.

Parameters

  • pool_size: Number of worker actors to create
  • initial_state: Initial state for each worker (all start with same state)
  • encoder: Encoder for worker messages
  • decoder: Decoder for worker messages
  • handler: Message handler for workers

Returns

Ok((Pid, List(GlobalSubject(msg)))) with the supervisor PID and all worker subjects, or Error(StartError) if pool creation fails.

Example

// Create a pool of 5 worker actors
let result = actor.pool_supervisor(
  5,
  0,
  request_encoder(),
  request_decoder(),
  handle_request,
)

case result {
  Ok(#(sup_pid, workers)) -> {
    // Round-robin distribution
    list.index_map(tasks, fn(task, idx) {
      let worker = list.at(workers, idx % list.length(workers))
      case worker {
        Ok(w) -> global.send(w, task)
        Error(_) -> Error(Nil)
      }
    })
  }
  Error(err) -> // handle error
}
pub fn start(
  initial_state: state,
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
  handler: fn(msg, state) -> receiver.Next(state),
) -> Result(process.Subject(BitArray), actor.StartError)

Deprecated: Use start_typed_actor instead; will be removed in v3.0.0

Start an actor that returns a raw Subject(BitArray).

This is a lower-level helper that wraps receiver.start_typed_receiver. Prefer start_typed_actor which returns a type-safe GlobalSubject(msg).

Use this only when you need direct access to the underlying Subject(BitArray) for manual message encoding or integration with legacy code.

Parameters

  • initial_state: The initial state
  • decoder: Decoder for incoming BitArray messages
  • handler: Message handler

Returns

Ok(Subject(BitArray)) on success, Error(StartFailed(err)) on failure.

pub fn start_global(
  initial_state: state,
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
  handler: fn(msg, state) -> receiver.Next(state),
) -> process.Subject(BitArray)

Deprecated: Use start_typed_actor instead; will be removed in v3.0.0

Start a global actor returning a raw Subject(BitArray).

Similar to start but uses receiver.start_global_receiver which creates a subject compatible with global registry operations.

Prefer start_typed_actor which wraps this and returns a type-safe GlobalSubject(msg).

Parameters

  • initial_state: The initial state
  • decoder: Decoder for incoming messages
  • handler: Message handler

Returns

A Subject(BitArray) with a Nil tag for global use.

pub fn start_server(
  initial_state: state,
  encoder: fn(request) -> Result(BitArray, codec.EncodeError),
  decoder: fn(BitArray) -> Result(request, codec.DecodeError),
  handler: fn(request, state) -> receiver.Next(state),
) -> global.GlobalSubject(request)

Start a request-response server actor.

This is a specialized version of start_typed_actor optimized for request-response patterns. It’s semantically identical but provides clearer intent in the API.

Use this when your actor primarily processes requests and sends responses back to callers via reply subjects embedded in the request messages.

Parameters

  • initial_state: The initial server state
  • encoder: Encoder for request messages
  • decoder: Decoder for request messages
  • handler: Request handler that processes requests and sends replies

Returns

A GlobalSubject(request) for sending requests to the server.

Example

pub type Request {
  Add(Int, Int, Subject(Int))
  Multiply(Int, Int, Subject(Int))
}

let server = actor.start_server(
  Nil,
  request_encoder(),
  request_decoder(),
  fn(req, _state) {
    case req {
      Add(a, b, reply) -> {
        process.send(reply, a + b)
        receiver.Continue(Nil)
      }
      Multiply(a, b, reply) -> {
        process.send(reply, a * b)
        receiver.Continue(Nil)
      }
    }
  },
)
pub fn start_typed_actor(
  initial_state: state,
  encoder: fn(msg) -> Result(BitArray, codec.EncodeError),
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
  handler: fn(msg, state) -> receiver.Next(state),
) -> global.GlobalSubject(msg)

Start a type-safe actor with global subject (RECOMMENDED).

This is the recommended way to create distributed actors. It wraps receiver.start_typed_actor for convenience and compatibility.

The returned GlobalSubject(msg) can be:

  • Registered globally with registry.register_typed
  • Used for cross-node messaging with messaging.send_typed
  • Passed to other processes safely

Malformed messages are silently ignored to prevent crashes.

Parameters

  • initial_state: The initial state value
  • encoder: Encoder for outgoing messages (used by clients)
  • decoder: Decoder for incoming messages
  • handler: Message handler returning Next(state)

Returns

A GlobalSubject(msg) that enforces type-safe messaging.

Example

pub type Counter {
  Inc
  Dec
  GetValue(Subject(Int))
}

let actor = actor.start_typed_actor(
  0,
  counter_encoder(),
  counter_decoder(),
  fn(msg, count) {
    case msg {
      Inc -> receiver.Continue(count + 1)
      Dec -> receiver.Continue(count - 1)
      GetValue(reply) -> {
        process.send(reply, count)
        receiver.Continue(count)
      }
    }
  },
)
pub fn start_typed_actor_registered(
  name: String,
  initial_state: state,
  encoder: fn(msg) -> Result(BitArray, codec.EncodeError),
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
  handler: fn(msg, state) -> receiver.Next(state),
) -> Result(global.GlobalSubject(msg), registry.RegisterError)

Start and register a typed actor under a global name.

This helper starts a typed actor and registers its subject with registry.register_typed(name, subject). Returns Ok(GlobalSubject(msg)) on success or Error(RegisterError) if registration fails.

Note: If registration fails, the started actor process will continue running. The caller is responsible for cleanup in error cases if needed.

pub fn start_typed_actor_started(
  initial_state: state,
  encoder: fn(msg) -> Result(BitArray, codec.EncodeError),
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
  handler: fn(msg, state) -> receiver.Next(state),
) -> Result(
  actor.Started(global.GlobalSubject(msg)),
  actor.StartError,
)

Start a typed actor via the OTP actor API and return the full Started result with GlobalSubject(msg) as the returned data. This is useful for supervision integration where the caller needs access to the pid.

pub fn start_typed_actor_supervised(
  initial_state: state,
  encoder: fn(msg) -> Result(BitArray, codec.EncodeError),
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
  handler: fn(msg, state) -> receiver.Next(state),
) -> Result(
  #(process.Pid, global.GlobalSubject(msg)),
  actor.StartError,
)

Start a typed actor under a new supervisor in one call.

This high-level helper creates a supervisor, adds the typed actor as a child, and starts the supervisor tree. Returns both the supervisor PID and the typed actor’s GlobalSubject for convenient access.

Parameters

  • initial_state: The initial state for the actor
  • encoder: Encoder for messages
  • decoder: Decoder for messages
  • handler: Message handler function

Returns

Ok((Pid, GlobalSubject(msg))) on success with both handles, or Error(StartError) if supervision setup fails.

Example

let result = actor.start_typed_actor_supervised(
  0,
  counter_encoder(),
  counter_decoder(),
  fn(msg, count) {
    case msg {
      Increment -> receiver.Continue(count + 1)
      GetCount(reply) -> {
        process.send(reply, count)
        receiver.Continue(count)
      }
    }
  },
)

case result {
  Ok(#(sup_pid, actor_subject)) -> {
    // Use actor_subject for messaging
    let _ = global.send(actor_subject, Increment)
    // Supervisor will restart actor on crashes
  }
  Error(err) -> // handle error
}
Search Document