eparch/state_machine

A type-safe, OTP-compatible, finite state machine implementation that leverages Erlang’s gen_statem behavior throught the Gleam ffi.

Differences from gen_statem

Unlike Erlang’s gen_statem, this implementation:

Types

Actions (side effects) to perform after handling an event.

Multiple actions can be returned as a list.

pub type Action(msg, reply) {
  Reply(from: From(reply), response: reply)
  Postpone
  NextEvent(content: msg)
  StateTimeout(milliseconds: Int)
  GenericTimeout(name: String, milliseconds: Int)
  ChangeCallbackModule(module: atom.Atom)
  PushCallbackModule(module: atom.Atom)
  PopCallbackModule
}

Constructors

  • Reply(from: From(reply), response: reply)

    Send a reply to a caller

  • Postpone

    Postpone this event until after a state change

  • NextEvent(content: msg)

    Insert a new event at the front of the queue

  • StateTimeout(milliseconds: Int)

    Set a state timeout (canceled on state change)

  • GenericTimeout(name: String, milliseconds: Int)

    Set a generic named timeout

  • ChangeCallbackModule(module: atom.Atom)

    Change the gen_statem callback module to module. The new module receives the internal #gleam_statem record as its data, use only for Erlang interop with modules that understand eparch’s internals.

  • PushCallbackModule(module: atom.Atom)

    Push the current callback module onto an internal stack and switch to module. Pop with PopCallbackModule to restore. Otherwise like ChangeCallbackModule.

  • PopCallbackModule

    Pop the top module from the internal callback-module stack and switch to it. Fails the server if the stack is empty.

A builder for configuring a state machine before starting it.

Generic parameters:

  • state: The type of state values (e.g., enum, custom type)
  • data: The type of data carried across state transitions
  • msg: The type of messages the state machine receives
  • return: What the start function returns to the parent
pub opaque type Builder(state, data, msg, return, reply)

Events that a state machine can receive.

This unifies the three types of messages in OTP:

  • Calls (synchronous, requires reply)
  • Casts (asynchronous / fire-and-forget)
  • Info (other messages, from selectors/monitors)
pub type Event(state, msg, reply) {
  Call(from: From(reply), message: msg)
  Cast(message: msg)
  Info(message: msg)
  Enter(old_state: state)
  Timeout(timeout: TimeoutType)
}

Constructors

  • Call(from: From(reply), message: msg)

    A synchronous call that expects a reply

  • Cast(message: msg)

    An asynchronous cast (fire-and-forget)

  • Info(message: msg)

    An info message (from selectors, monitors, etc)

  • Enter(old_state: state)

    Internal event fired when entering a state (if state_enter enabled) Contains the previous state

  • Timeout(timeout: TimeoutType)

    Timeout events (state timeout or generic timeout)

Opaque reference to a caller (for replying to calls).

Represents Erlang’s gen_statem:from() type. Values of this type only ever originate from a Call event delivered by the gen_statem runtime.

pub type From(reply)

Errors that can occur when starting a state machine.

pub type StartError {
  InitTimeout
  InitFailed(String)
  InitExited(process.ExitReason)
}

Constructors

Convenience type for start results.

pub type StartResult(data) =
  Result(Started(data), StartError)

Data returned when a state machine starts successfully.

pub type Started(data) {
  Started(pid: process.Pid, data: data)
}

Constructors

  • Started(pid: process.Pid, data: data)

    Arguments

    pid

    The process identifier of the started state machine

    data

    Data returned after initialization (typically a Subject)

The result of handling an event.

Indicates what the state machine should do next.

pub type Step(state, data, msg, reply) {
  NextState(
    state: state,
    data: data,
    actions: List(Action(msg, reply)),
  )
  KeepState(data: data, actions: List(Action(msg, reply)))
  Stop(reason: process.ExitReason)
}

Constructors

  • NextState(
      state: state,
      data: data,
      actions: List(Action(msg, reply)),
    )

    Transition to a new state

  • KeepState(data: data, actions: List(Action(msg, reply)))

    Keep the current state

  • Stop(reason: process.ExitReason)

    Stop the state machine

Types of timeouts

pub type TimeoutType {
  StateTimeoutType
  GenericTimeoutType(name: String)
}

Constructors

  • StateTimeoutType
  • GenericTimeoutType(name: String)

Values

pub fn call(
  subject: process.Subject(message),
  waiting timeout: Int,
  sending make_message: fn(process.Subject(reply)) -> message,
) -> reply

Send a synchronous call and wait for a reply.

This is a re-export of process.call for convenience.

pub fn cast(subject: process.Subject(msg), msg: msg) -> Nil

Send an asynchronous cast to a state machine (arrives as Cast).

Unlike send, which routes messages through process.send and delivers them as Info(msg), this function calls gen_statem:cast so messages arrive as Cast(msg) in the event handler.

Use cast when you want to distinguish machine-level commands from ambient info messages (monitors, raw Erlang signals, etc.).

Example

fn handle_event(event, state, data) {
  case event {
    Cast(Increment) -> keep_state(data + 1, [])
    Info(_)         -> keep_state(data, [])   // ignore ambient noise
    _               -> keep_state(data, [])
  }
}
pub fn change_callback_module(
  module: atom.Atom,
) -> Action(msg, reply)

Create a ChangeCallbackModule action.

Swaps the gen_statem callback module at runtime. The new module receives the internal #gleam_statem record as its data, so it must be an Erlang module written to understand eparch internals. Use only for advanced Erlang interop; most applications should not need this.

Since OTP 22.3.

Example

state_machine.change_callback_module(atom.create("my_erlang_module"))
pub fn generic_timeout(
  name: String,
  milliseconds: Int,
) -> Action(msg, reply)

Create a GenericTimeout action.

Sets a named timeout that persists across state changes.

pub fn keep_state(
  data: data,
  actions: List(Action(msg, reply)),
) -> Step(state, data, msg, reply)

Create a KeepState step indicating no state change.

Example

keep_state(data, [])
pub fn named(
  builder: Builder(state, data, msg, return, reply),
  name: process.Name(msg),
) -> Builder(state, data, msg, return, reply)

Provide a name for the state machine to be registered with when started.

This enables sending messages to it via a named subject.

pub fn new(
  initial_state initial_state: state,
  initial_data initial_data: data,
) -> Builder(state, data, msg, process.Subject(msg), reply)

Create a new state machine builder with initial state and data.

By default, the state machine will return a Subject that can be used to send messages to it.

Example

state_machine.new(initial_state: Idle, initial_data: 0)
|> state_machine.on_event(handle_event)
|> state_machine.start
pub fn next_event(content: msg) -> Action(msg, reply)

Create a NextEvent action.

Inserts a new event at the front of the event queue.

pub fn next_state(
  state: state,
  data: data,
  actions: List(Action(msg, reply)),
) -> Step(state, data, msg, reply)

Create a NextState step indicating a state transition.

Example

next_state(Active, new_data, [StateTimeout(5000)])
pub fn on_code_change(
  builder: Builder(state, data, msg, return, reply),
  handler: fn(data) -> data,
) -> Builder(state, data, msg, return, reply)

Provide a migration function called during hot-code upgrades.

When an OTP release upgrades the running code, gen_statem calls code_change/4. If a migration function is set, it receives the current data value and its return value becomes the new data. Use this to migrate data structures between versions without restarting the process.

If not set, the data passes through unchanged (the default and safe behaviour for most applications).

Example

// Old data shape: Int
// New data shape: Data(count: Int, label: String)
state_machine.new(Idle, 0)
|> state_machine.on_code_change(fn(old_count) { Data(old_count, "default") })
|> state_machine.on_event(handle_event)
|> state_machine.start
pub fn on_event(
  builder: Builder(state, data, msg, return, reply),
  handler: fn(Event(state, msg, reply), state, data) -> Step(
    state,
    data,
    msg,
    reply,
  ),
) -> Builder(state, data, msg, return, reply)

Set the event handler callback function.

This function is called for every event the state machine receives. It takes the current event, state, and data, and returns a Step indicating what to do next.

Example

fn handle_event(event, state, data) {
  case event, state {
    Call(from, GetCount), Running ->
      keep_state(data, [Reply(from, data.count)])

    Cast(Increment), Running ->
      keep_state(Data(..data, count: data.count + 1), [])

    _, _ -> keep_state(data, [])
  }
}

state_machine.new(Running, Data(0))
|> state_machine.on_event(handle_event)
|> state_machine.start
pub fn pop_callback_module() -> Action(msg, reply)

Create a PopCallbackModule action.

Pops the top module off the callback-module stack and switches to it. Fails the server if the stack is empty, so only use after a matching push_callback_module.

Since OTP 22.3.

Example

state_machine.pop_callback_module()
pub fn postpone() -> Action(msg, reply)

Create a Postpone action.

Postpones the current event until after the next state change.

pub fn push_callback_module(
  module: atom.Atom,
) -> Action(msg, reply)

Create a PushCallbackModule action.

Pushes the current callback module onto an internal stack and switches to module. Restore the previous module with pop_callback_module. Same data-sharing caveats as change_callback_module apply.

Since OTP 22.3.

Example

state_machine.push_callback_module(atom.create("my_erlang_module"))
pub fn reply(
  from: From(reply),
  response: reply,
) -> Action(msg, reply)

Create a Reply action.

Example

case event {
  Call(from, GetData) -> keep_state(data, [Reply(from, data)])
  _ -> keep_state(data, [])
}
pub fn reply_and_keep(
  from: From(reply),
  response: reply,
  data: data,
) -> Step(state, data, msg, reply)

Reply and keep the current state.

Example

reply_and_keep(from, Ok(data.count), data)
pub fn reply_and_next(
  from: From(reply),
  response: reply,
  state: state,
  data: data,
) -> Step(state, data, msg, reply)

Reply and transition to a new state.

Example

reply_and_next(from, Ok(Nil), Active, new_data)
pub fn send(subject: process.Subject(msg), msg: msg) -> Nil

Send a message to a state machine via process.send (arrives as Info).

The message is delivered to the handler as Info(msg). Use this for messages sent from processes that are not aware of this library, e.g. monitors, timers, or plain Erlang processes.

To deliver messages as Cast(msg) instead, use cast/2.

pub fn start(
  builder: Builder(state, data, msg, process.Subject(msg), reply),
) -> Result(Started(process.Subject(msg)), StartError)

Start the state machine process.

Spawns a linked gen_statem process, runs initialisation, and returns a Started value containing the PID and a Subject that can be used to send messages to the machine.

Example

let assert Ok(machine) =
  state_machine.new(initial_state: Idle, initial_data: 0)
  |> state_machine.on_event(handle_event)
  |> state_machine.start

// Send a fire-and-forget message
process.send(machine.data, SomeMessage)

// Send a synchronous message with a reply
let reply = process.call(machine.data, 1000, SomeRequest)
pub fn state_timeout(milliseconds: Int) -> Action(msg, reply)

Create a StateTimeout action.

Sets a timeout that is automatically canceled when the state changes.

pub fn stop(
  reason: process.ExitReason,
) -> Step(state, data, msg, reply)

Create a Stop step indicating the state machine should terminate.

Example

stop(process.Normal)
pub fn with_state_enter(
  builder: Builder(state, data, msg, return, reply),
) -> Builder(state, data, msg, return, reply)

Enable state_enter calls.

When enabled, your event handler will be called with an Enter event whenever the state changes. This allows you to perform actions when entering a state (like setting timeouts, logging, etc).

The Enter event contains the previous state.

Example

fn handle_event(event, state, data) {
  case event, state {
    Enter(old), Active if old != Active -> {
      // Perform entry actions
      keep_state(data, [StateTimeout(30_000)])
    }
    _, _ -> keep_state(data, [])
  }
}

state_machine.new(Idle, data)
|> state_machine.with_state_enter()
|> state_machine.on_event(handle_event)
|> state_machine.start
Search Document