eensy/otp/actor

This module provides the Actor abstraction, one of the most common building blocks of Gleam OTP programs.

An Actor is a process like any other BEAM process and can be be used to hold state, execute code, and communicate with other processes by sending and receiving messages. The advantage of using the actor abstraction over a bare process is that it provides a single interface for commonly needed functionality, including support for the tracing and debugging features in OTP.

Gleam’s Actor is similar to Erlang’s gen_server and Elixir’s GenServer but differs in that it offers a fully typed interface. This different API is why Gleam uses the name Actor rather than some variation of generic-server.

Example

An Actor can be used to create a client-server interaction between an Actor (the server) and other processes (the clients). In this example we have an Actor that works as a stack, allowing clients to push and pop elements.

pub fn main() {
  // Start the actor with initial state of an empty list, and the
  // `handle_message` callback function (defined below).
  // We assert that it starts successfully.
  // 
  // In real-world Gleam OTP programs we would likely write wrapper functions
  // called `start`, `push` `pop`, `shutdown` to start and interact with the
  // Actor. We are not doing that here for the sake of showing how the Actor 
  // API works.
  let assert Ok(my_actor) = actor.start([], handle_message)

  // We can send a message to the actor to push elements onto the stack.
  process.send(my_actor, Push("Joe"))
  process.send(my_actor, Push("Mike"))
  process.send(my_actor, Push("Robert"))

  // The `Push` message expects no response, these messages are sent purely for
  // the side effect of mutating the state held by the actor.
  //
  // We can also send the `Pop` message to take a value off of the actor's
  // stack. This message expects a response, so we use `process.call` to send a
  // message and wait until a reply is received.
  //
  // In this instance we are giving the actor 10 milliseconds to reply, if the
  // `call` function doesn't get a reply within this time it will panic and
  // crash the client process.
  let assert Ok("Robert") = process.call(my_actor, Pop, 10)
  let assert Ok("Mike") = process.call(my_actor, Pop, 10)
  let assert Ok("Joe") = process.call(my_actor, Pop, 10)

  // The stack is now empty, so if we pop again the actor replies with an error.
  let assert Error(Nil) = process.call(my_actor, Pop, 10)

  // Lastly, we can send a message to the actor asking it to shut down.
  process.send(my_actor, Shutdown)
}

Here is the code that is used to implement this actor:

import gleam/erlang/process.{type Subject}
import gleam/otp/actor

// First step of implementing the stack Actor is to define the message type that
// it can receive.
//
// The type of the elements in the stack is not fixed so a type parameter is used
// for it instead of a concrete type such as `String` or `Int`.
pub type Message(element) {
  // The `Shutdown` message is used to tell the actor to stop.
  // It is the simplest message type, it contains no data.
  Shutdown

  // The `Push` message is used to add a new element to the stack.
  // It contains the item to add, the type of which is the `element`
  // parameterised type.
  Push(push: element)

  // The `Pop` message is used to remove an element from the stack.
  // It contains a `Subject`, which is used to send the response back to the
  // message sender. In this case the reply is of type `Result(element, Nil)`.
  Pop(reply_with: Subject(Result(element, Nil)))
}

// The last part is to implement the `handle_message` callback function.
//
// This function is called by the Actor for each message it receives.
// Actor is single threaded and only does one thing at a time, so it handles
// messages sequentially and one at a time, in the order they are received.
//
// The function takes the message and the current state, and returns a data
// structure that indicates what to do next, along with the new state.
fn handle_message(
 message: Message(e),
 stack: List(e),
) -> actor.Next(Message(e), List(e)) {
  case message {
    // For the `Shutdown` message we return the `actor.Stop` value, which causes
    // the actor to discard any remaining messages and stop.
    Shutdown -> actor.Stop(process.Normal)

    // For the `Push` message we add the new element to the stack and return
    // `actor.continue` with this new stack, causing the actor to process any
    // queued messages or wait for more.
    Push(value) -> {
      let new_state = [value, ..stack]
      actor.continue(new_state)
    }

    // For the `Pop` message we attempt to remove an element from the stack,
    // sending it or an error back to the caller, before continuing.
    Pop(client) ->
      case stack {
        [] -> {
          // When the stack is empty we can't pop an element, so we send an
          // error back.
          process.send(client, Error(Nil))
          actor.continue([])
        }

        [first, ..rest] -> {
          // Otherwise we send the first element back and use the remaining
          // elements as the new state.
          process.send(client, Ok(first))
          actor.continue(rest)
        }
      }
  }
}

Types

An Erlang supervisor compatible process start result.

pub type ErlangStartResult =
  Result(Pid, Dynamic)

The type used to indicate whether an actor has started successfully or not.

pub type InitResult(state, message) {
  Ready(state: state, selector: Selector(message))
  Failed(String)
}

Constructors

  • Ready(state: state, selector: Selector(message))

    The actor has successfully initialised. The actor can start handling messages and actor’s channel sender can be returned to the parent process.

  • Failed(String)

    The actor has failed to initialise. The actor shuts down and an error is returned to the parent process.

The type used to indicate what to do after handling a message.

pub type Next(message, state) {
  Continue(state: state, selector: Option(Selector(message)))
  Stop(ExitReason)
}

Constructors

  • Continue(state: state, selector: Option(Selector(message)))

    Continue handling messages.

    An optional selector can be provided to changes the messages that the actor is handling. This replaces any selector that was previously given in the actor’s init callback, or in any previous Next value.

  • Stop(ExitReason)

    Stop handling messages and shut down.

This data structure holds all the values required by the start_spec function in order to create an actor.

If you do not need to configure the initialisation behaviour of your actor consider using the start function.

pub type Spec(state, msg) {
  Spec(
    init: fn() -> InitResult(state, msg),
    init_timeout: Int,
    loop: fn(msg, state) -> Next(msg, state),
  )
}

Constructors

  • Spec(
      init: fn() -> InitResult(state, msg),
      init_timeout: Int,
      loop: fn(msg, state) -> Next(msg, state),
    )

    Arguments

    • init

      The initialisation functionality for the actor. This function is called just after the actor starts but before the channel sender is returned to the parent.

      This function is used to ensure that any required data or state is correct. If this function returns an error it means that the actor has failed to start and an error is returned to the parent.

    • init_timeout

      How many milliseconds the init function has to return before it is considered to have taken too long and failed.

    • loop

      This function is called to handle each message that the actor receives.

pub type StartError {
  InitTimeout
  InitFailed(ExitReason)
  InitCrashed(Dynamic)
}

Constructors

  • InitTimeout
  • InitFailed(ExitReason)
  • InitCrashed(Dynamic)

The result of starting a Gleam actor.

This type is compatible with Gleam supervisors. If you wish to convert it to a type compatible with Erlang supervisors see the ErlangStartResult type and erlang_start_result function.

pub type StartResult(msg) =
  Result(Subject(msg), StartError)

Functions

pub fn call(
  subject: Subject(a),
  make_message: fn(Subject(b)) -> a,
  timeout: Int,
) -> b

Send a synchronous message and wait for a response from the receiving process.

If a reply is not received within the given timeout then the sender process crashes. If you wish to receive a Result rather than crashing see the process.try_call function.

This is a re-export of process.call, for the sake of convenience.

pub fn continue(state: a) -> Next(b, a)
pub fn send(subject: Subject(a), msg: a) -> Nil

Send a message over a given channel.

This is a re-export of process.send, for the sake of convenience.

pub fn start(
  state: a,
  loop: fn(b, a) -> Next(b, a),
) -> Result(Subject(b), StartError)

Start an actor with a given initial state and message handling loop function.

This function returns a Result but it will always be Ok so it is safe to use with assert if you are not starting this actor as part of a supervision tree.

If you wish to configure the initialisation behaviour of a new actor see the Spec record and the start_spec function.

pub fn start_spec(
  spec: Spec(a, b),
) -> Result(Subject(b), StartError)

Start an actor from a given specification. If the actor’s init function returns an error or does not return within init_timeout then an error is returned.

If you do not need to specify the initialisation behaviour of your actor consider using the start function.

pub fn to_erlang_start_result(
  res: Result(Subject(a), StartError),
) -> Result(Pid, Dynamic)

Convert a Gleam actor start result into an Erlang supervisor-compatible process start result.

pub fn with_selector(
  value: Next(a, b),
  selector: Selector(a),
) -> Next(a, b)

Provide a selector to change the messages that the actor is handling going forward. This replaces any selector that was previously given in the actor’s init callback, or in any previous Next value.

Search Document