Hephaestus.Steps.Step behaviour (hephaestus v0.3.1)

Copy Markdown View Source

Behaviour for workflow step implementations.

Every step in a workflow must implement this behaviour. The execute/3 callback receives the current instance, optional config, and the execution context.

Return values

  • {:ok, event} - step completed synchronously, emitting the named event
  • {:ok, event, context_updates} - completed with data to store in context
  • {:ok, event, context_updates, metadata_updates} - completed with context and runtime metadata updates
  • {:async} - step is async (e.g., waiting for external event or timer)
  • {:error, reason} - step failed

Example

defmodule MyApp.Steps.ValidateOrder do
  @behaviour Hephaestus.Steps.Step

  @impl true
  def events, do: [:valid, :invalid]

  @impl true
  def execute(_instance, _config, context) do
    if context.initial[:items] && length(context.initial.items) > 0 do
      {:ok, :valid, %{item_count: length(context.initial.items)}}
    else
      {:ok, :invalid}
    end
  end
end

Summary

Types

Optional configuration map passed to a step's execute/3 callback, or nil if no config is set.

A map of key-value pairs to merge into the workflow context after a step completes.

An atom representing the outcome of a step execution, used to determine the next transition in the workflow.

A map of key-value pairs to merge into the instance's runtime metadata for observability.

The return type of execute/3.

Retry configuration controlling how the runner retries a failed step.

Callbacks

Returns the list of events this step can emit.

Executes the step logic.

Returns the retry configuration for this step.

Returns a custom atom key identifying this step.

Types

config()

@type config() :: map() | nil

Optional configuration map passed to a step's execute/3 callback, or nil if no config is set.

context_updates()

@type context_updates() :: map()

A map of key-value pairs to merge into the workflow context after a step completes.

event()

@type event() :: atom()

An atom representing the outcome of a step execution, used to determine the next transition in the workflow.

metadata_updates()

@type metadata_updates() :: map()

A map of key-value pairs to merge into the instance's runtime metadata for observability.

result()

@type result() ::
  {:ok, event()}
  | {:ok, event(), context_updates()}
  | {:ok, event(), context_updates(), metadata_updates()}
  | {:async}
  | {:error, term()}

The return type of execute/3.

  • {:ok, event} — step completed synchronously, emitting the named event.
  • {:ok, event, context_updates} — completed with additional data to store in the workflow context.
  • {:ok, event, context_updates, metadata_updates} — completed with context and runtime metadata updates.
  • {:async} — step is asynchronous and will be resumed later (e.g., after a timer or external event).
  • {:error, reason} — step failed with the given reason.

retry_config()

@type retry_config() :: %{
  max_attempts: pos_integer(),
  backoff: :exponential | :linear | :constant,
  max_backoff: pos_integer()
}

Retry configuration controlling how the runner retries a failed step.

Callbacks

events()

@callback events() :: [event()]

Returns the list of events this step can emit.

The workflow engine uses this list to validate that transitions defined in the workflow graph match the events the step actually produces. Each event atom should correspond to a possible outcome of execute/3.

Examples

@impl true
def events, do: [:done]

A step with multiple outcomes:

@impl true
def events, do: [:approved, :rejected]

execute(instance, config, context)

@callback execute(
  instance :: Hephaestus.Core.Instance.t(),
  config :: config(),
  context :: Hephaestus.Core.Context.t()
) :: result()

Executes the step logic.

Receives the current workflow instance, an optional config map defined in the workflow graph for this step, and the execution context containing initial data and results from previous steps.

Must return a result/0 tuple indicating the outcome.

Examples

Synchronous step returning an event:

@impl true
def execute(_instance, _config, _context), do: {:ok, :done}

Step that reads context and returns updates:

@impl true
def execute(_instance, _config, context) do
  items = context.initial.items
  {:ok, :done, %{item_count: length(items)}}
end

Step that emits runtime metadata for observability:

@impl true
def execute(_instance, _config, context) do
  order_id = context.initial.order_id
  {:ok, :done, %{total: 100}, %{"order_id" => order_id}}
end

Asynchronous step (waits for external resume):

@impl true
def execute(_instance, _config, _context), do: {:async}

retry_config()

(optional)
@callback retry_config() :: retry_config()

Returns the retry configuration for this step.

Optional. When implemented, the runner will automatically retry the step on failure according to the returned configuration, which specifies the maximum number of attempts, backoff strategy (:exponential, :linear, or :constant), and maximum backoff interval in milliseconds.

Examples

@impl true
def retry_config do
  %{max_attempts: 3, backoff: :exponential, max_backoff: 30_000}
end

step_key()

(optional)
@callback step_key() :: atom()

Returns a custom atom key identifying this step.

Optional. When not implemented, the workflow engine derives the key from the module name. Override this to provide a shorter or more meaningful identifier for storage and logging.

Examples

@impl true
def step_key, do: :validate