PgFlow.Context (PgFlow v0.1.0)

Copy Markdown View Source

Context struct passed to step handler functions.

The context provides metadata about the current execution environment and utilities for accessing flow data.

Fields

  • :run_id - UUID of the current flow run
  • :step_slug - Slug of the current step being executed
  • :task_index - Index of the task within the step (0 for single steps)
  • :attempt - Current attempt number (1-indexed)
  • :flow_input - Lazy-loaded flow input (use get_flow_input/1 to access)
  • :repo - Ecto repository module for database access

Usage

Step handlers receive the context as their second argument:

step :process, depends_on: [:fetch] do
  fn deps, ctx ->
    # Access context fields
    IO.puts("Running step #{ctx.step_slug} for run #{ctx.run_id}")
    IO.puts("This is attempt #{ctx.attempt}")

    # Get flow input if needed
    input = PgFlow.Context.get_flow_input(ctx)

    # Use dependencies from previous steps
    %{result: deps.fetch.data}
  end
end

Summary

Functions

Loads the flow input from the database.

Creates a new context struct.

Preloads the flow input into the context.

Types

t()

@type t() :: %PgFlow.Context{
  attempt: pos_integer(),
  flow_input: map() | :not_loaded,
  repo: module(),
  run_id: Ecto.UUID.t(),
  step_slug: atom(),
  task_index: non_neg_integer()
}

Functions

get_flow_input(ctx)

@spec get_flow_input(t()) :: map()

Loads the flow input from the database.

The flow input is lazily loaded to avoid unnecessary database queries when the input is not needed by the step handler.

Returns the flow input as a map, or raises if the run cannot be found.

Examples

input = PgFlow.Context.get_flow_input(ctx)
#=> %{"order_id" => 123, "customer_id" => 456}

new(opts)

@spec new(keyword()) :: t()

Creates a new context struct.

Examples

ctx = PgFlow.Context.new(
  run_id: "550e8400-e29b-41d4-a716-446655440000",
  step_slug: :process,
  task_index: 0,
  attempt: 1,
  repo: MyApp.Repo
)

preload_flow_input(ctx)

@spec preload_flow_input(t()) :: t()

Preloads the flow input into the context.

This is useful when you want to load the flow input eagerly, such as when processing multiple tasks that will all need access to the flow input.

Examples

ctx = PgFlow.Context.preload_flow_input(ctx)
# flow_input is now loaded and cached in the context