Hephaestus.Core.Engine (hephaestus v0.3.1)

Copy Markdown View Source

Pure functional workflow engine for the callback-based v2 API.

Summary

Functions

Resolves and activates outgoing transitions from a completed step.

Advances a workflow instance to its next state.

Checks whether the workflow instance has completed.

Marks a step as completed and stores its context updates.

Executes a step module's c:Hephaestus.Step.execute/3 callback.

Resumes a waiting workflow instance after an asynchronous step completes.

Functions

activate_transitions(instance, from_module, event)

@spec activate_transitions(Hephaestus.Core.Instance.t(), module(), atom()) ::
  Hephaestus.Core.Instance.t()

Resolves and activates outgoing transitions from a completed step.

Calls the workflow's transit/3 callback to determine the next step(s) for the given from_module and event. Each target step is activated only if all of its predecessors have already completed (join semantics).

Supports single targets, {target, config} tuples, and lists of targets for parallel fan-out.

Parameters

  • instance - a Hephaestus.Core.Instance struct
  • from_module - the step module that just completed
  • event - the event atom returned by the completed step

Returns

The updated Hephaestus.Core.Instance struct with newly activated steps.

advance(instance)

@spec advance(Hephaestus.Core.Instance.t()) ::
  {:ok, Hephaestus.Core.Instance.t()} | {:error, term()}

Advances a workflow instance to its next state.

If the instance is :pending, it is started by activating the workflow's initial step. If the instance is :waiting or has active steps still running, it is returned unchanged. Otherwise, completion is checked and the instance may transition to :completed.

Parameters

Returns

  • {:ok, instance} - the (possibly updated) instance
  • {:error, reason} - if advancing fails

Examples

iex> instance = Instance.new(MyApp.Workflows.OrderFlow, 1, %{order_id: 123}, "orderid::123")
iex> {:ok, advanced} = Engine.advance(instance)
iex> advanced.status
:running
iex> MapSet.member?(advanced.active_steps, MyApp.Steps.ValidateOrder)
true

A :waiting instance is returned unchanged:

iex> {:ok, ^waiting_instance} = Engine.advance(waiting_instance)

check_completion(instance)

Checks whether the workflow instance has completed.

If there are no active steps and the instance is not :waiting, the status is set to :completed and current_step is cleared. Otherwise, the instance is returned unchanged.

Parameters

Returns

The (possibly updated) Hephaestus.Core.Instance struct.

complete_step(instance, step_module, event, context_updates, metadata_updates \\ %{})

@spec complete_step(Hephaestus.Core.Instance.t(), module(), atom(), map(), map()) ::
  Hephaestus.Core.Instance.t()

Marks a step as completed and stores its context updates.

Moves step_module from the active set to the completed set, merges context_updates into the instance context under the step's context key, and removes the step's runtime configuration.

When metadata_updates is provided, merges them into the instance's runtime_metadata map for observability purposes.

Parameters

  • instance - a Hephaestus.Core.Instance struct
  • step_module - the step module that finished
  • event - the completion event atom (unused internally, passed through for consistency)
  • context_updates - a map of results to store in the instance context
  • metadata_updates - an optional map of runtime metadata to merge into the instance (default: %{})

Returns

The updated Hephaestus.Core.Instance struct.

Examples

iex> instance = Engine.complete_step(instance, MyApp.Steps.ValidateOrder, :done, %{item_count: 3})
iex> MapSet.member?(instance.completed_steps, MyApp.Steps.ValidateOrder)
true
iex> instance.context.steps.validate_order.item_count
3

With runtime metadata:

iex> instance = Engine.complete_step(instance, MyApp.Steps.ProcessOrder, :done, %{total: 100}, %{"order_id" => 42})
iex> instance.runtime_metadata
%{"order_id" => 42}

execute_step(instance, step_module)

@spec execute_step(Hephaestus.Core.Instance.t(), module()) ::
  {:ok, atom()}
  | {:ok, atom(), map()}
  | {:ok, atom(), map(), map()}
  | {:async}
  | {:error, term()}

Executes a step module's c:Hephaestus.Step.execute/3 callback.

Looks up the step's configuration from the instance and delegates to step_module.execute/3, passing the instance, its config, and the current context.

Parameters

Returns

  • {:ok, event} - step completed synchronously with the given event atom
  • {:ok, event, context_updates} - step completed with context updates
  • {:ok, event, context_updates, metadata_updates} - step completed with context and runtime metadata updates
  • {:async} - step will complete asynchronously (instance enters :waiting)
  • {:error, reason} - step execution failed

Examples

iex> {:ok, instance} = Engine.advance(instance)
iex> {:ok, :done} = Engine.execute_step(instance, MyApp.Steps.ValidateOrder)

A step may return context updates along with the event:

iex> {:ok, :done, %{item_count: 3}} = Engine.execute_step(instance, MyApp.Steps.ProcessOrder)

Asynchronous steps return {:async}:

iex> {:async} = Engine.execute_step(instance, MyApp.Steps.WaitForPayment)

resume_step(instance, step_module, event)

Resumes a waiting workflow instance after an asynchronous step completes.

Completes the given step, activates any outgoing transitions for event, and sets the instance status back to :running. If the instance is not in the :waiting status, it is returned unchanged.

Parameters

  • instance - a Hephaestus.Core.Instance struct (must be :waiting)
  • step_module - the step module that produced the resume event
  • event - the event atom that triggered the resume

Returns

The updated Hephaestus.Core.Instance struct.

Examples

iex> instance = Engine.resume_step(waiting_instance, MyApp.Steps.WaitForPayment, :payment_confirmed)
iex> instance.status
:running

If the instance is not :waiting, it is returned unchanged:

iex> running_instance = Engine.resume_step(running_instance, MyApp.Steps.SomeStep, :done)
iex> running_instance.status
:running