Task execution logic for flow steps.
This module handles the execution of step handlers with proper timeout handling, output serialization, and error handling. It manages the complete lifecycle of a task execution:
- Building execution context with run metadata
- Retrieving the handler function from the flow module
- Preparing input data (flow input for root steps, deps for dependent steps)
- Executing the handler with timeout protection
- Serializing output as JSON-compatible data
- Converting errors to serializable format
Handler Signatures
Step handlers receive different inputs based on their dependencies:
Root steps (no dependencies):
fn input, ctx -> output endinputis the flow input mapctxis thePgFlow.Contextstruct
Dependent steps:
fn deps, ctx -> output enddepsis a map of dependency outputs keyed by step slugctxis thePgFlow.Contextstruct
Timeout Handling
Task execution is protected by a configurable timeout. If a handler exceeds the timeout, it is terminated and the task is marked as failed.
Output Serialization
Handler outputs must be JSON-serializable. The executor validates and converts outputs to ensure they can be stored in the database.
Usage
task = %{
run_id: "550e8400-e29b-41d4-a716-446655440000",
step_slug: :process,
task_index: 0,
attempt: 1,
input: %{"key" => "value"},
deps: %{}
}
case PgFlow.Worker.Executor.execute(MyFlow, task, MyApp.Repo) do
{:ok, output} ->
# Task succeeded, persist output
PgFlow.Queries.Flows.complete_task(repo, task.run_id, task.step_slug, task.task_index, output)
{:error, error_message} ->
# Task failed, persist error
PgFlow.Queries.Flows.fail_task(repo, task.run_id, task.step_slug, task.task_index, error_message)
end
Summary
Functions
Builds a PgFlow.Context struct for task execution.
Executes a flow step handler.
Executes a handler function with timeout protection.
Retrieves the handler function for a step.
Prepares input data for handler execution.
Converts an error to a serializable string representation.
Serializes handler output to ensure JSON compatibility.
Functions
@spec build_context(map(), Ecto.Repo.t()) :: {:ok, PgFlow.Context.t()} | {:error, term()}
Builds a PgFlow.Context struct for task execution.
The context provides step handlers with metadata about the current execution environment and utilities for accessing flow data.
Parameters
task- Task map with execution detailsrepo- The Ecto repository
Returns
{:ok, context}- Context struct ready for handler execution{:error, reason}- Error if context creation fails
Examples
task = %{
run_id: "550e8400-e29b-41d4-a716-446655440000",
step_slug: :process,
task_index: 0,
attempt: 1,
input: %{},
deps: %{}
}
{:ok, ctx} = PgFlow.Worker.Executor.build_context(task, MyApp.Repo)
#=> {:ok, %PgFlow.Context{
#=> run_id: "550e8400-e29b-41d4-a716-446655440000",
#=> step_slug: :process,
#=> task_index: 0,
#=> attempt: 1,
#=> repo: MyApp.Repo,
#=> flow_input: :not_loaded
#=> }}
Executes a flow step handler.
Parameters
flow_module- The flow module containing the step handlerstask- Task map with execution detailsrepo- The Ecto repositoryopts- Options keyword list
Options
:timeout- Execution timeout in milliseconds (default: 30_000)
Task Structure
The task map must contain:
:run_id- Flow run UUID:step_slug- Step identifier atom:task_index- Task index (0 for single steps):attempt- Current attempt number:input- Task input data map:deps- Dependency outputs map
Returns
{:ok, output}- Handler succeeded, output is JSON-serializable{:error, error_message}- Handler failed or timed out
Examples
task = %{
run_id: "550e8400-e29b-41d4-a716-446655440000",
step_slug: :fetch_user,
task_index: 0,
attempt: 1,
input: %{"user_id" => 123},
deps: %{}
}
{:ok, output} = PgFlow.Worker.Executor.execute(MyFlow, task, MyApp.Repo)
#=> {:ok, %{"name" => "John", "email" => "john@example.com"}}
# Handler that times out
{:error, msg} = PgFlow.Worker.Executor.execute(SlowFlow, task, MyApp.Repo, timeout: 100)
#=> {:error, "Task timed out after 100ms"}
@spec execute_with_timeout(function(), map(), PgFlow.Context.t(), pos_integer()) :: {:ok, term()} | {:error, String.t()}
Executes a handler function with timeout protection.
Spawns the handler in a separate task and awaits the result with the specified timeout. If the timeout is exceeded, the task is killed and an error is returned.
Parameters
handler- Handler function to executeinput- Input data for the handlerctx- Execution contexttimeout- Timeout in milliseconds
Returns
{:ok, output}- Handler completed successfully{:error, reason}- Handler failed or timed out
Examples
handler = fn input, _ctx -> %{result: input["value"] * 2} end
input = %{"value" => 21}
{:ok, output} = PgFlow.Worker.Executor.execute_with_timeout(handler, input, ctx, 5000)
#=> {:ok, %{result: 42}}
slow_handler = fn _input, _ctx -> Process.sleep(10_000); %{} end
{:error, msg} = PgFlow.Worker.Executor.execute_with_timeout(slow_handler, input, ctx, 100)
#=> {:error, "Task timed out after 100ms"}
Retrieves the handler function for a step.
Calls the flow module's __pgflow_handler__/1 function to get the
handler function for the specified step.
Parameters
flow_module- The flow modulestep_slug- Step identifier atom
Returns
{:ok, handler}- Handler function{:error, reason}- Error if handler not found
Examples
{:ok, handler} = PgFlow.Worker.Executor.get_handler(MyFlow, :process)
#=> {:ok, #Function<...>}
{:error, msg} = PgFlow.Worker.Executor.get_handler(MyFlow, :nonexistent)
#=> {:error, "No handler defined for step: :nonexistent"}
@spec prepare_input(map(), PgFlow.Context.t()) :: {:ok, map()} | {:error, term()}
Prepares input data for handler execution.
For root steps (no dependencies), returns the task input directly. For dependent steps, returns the deps map containing outputs from dependencies.
Parameters
task- Task map with input and depsctx- Execution context (currently unused, for future extensions)
Returns
{:ok, handler_input}- Input data for the handler{:error, reason}- Error if input preparation fails
Examples
# Root step - uses input
task = %{input: %{"key" => "value"}, deps: %{}}
{:ok, input} = PgFlow.Worker.Executor.prepare_input(task, ctx)
#=> {:ok, %{"key" => "value"}}
# Dependent step - uses deps
task = %{input: %{}, deps: %{fetch: %{"result" => 123}}}
{:ok, input} = PgFlow.Worker.Executor.prepare_input(task, ctx)
#=> {:ok, %{fetch: %{"result" => 123}}}
Converts an error to a serializable string representation.
Handles various error types including strings, atoms, exceptions, and complex terms. Ensures errors can be persisted to the database.
Parameters
error- Error value to serialize
Returns
A string representation of the error.
Examples
PgFlow.Worker.Executor.serialize_error("Connection failed")
#=> "Connection failed"
PgFlow.Worker.Executor.serialize_error(:timeout)
#=> "timeout"
PgFlow.Worker.Executor.serialize_error(%RuntimeError{message: "Something broke"})
#=> "RuntimeError: Something broke"
PgFlow.Worker.Executor.serialize_error({:error, :not_found})
#=> "{:error, :not_found}"
Serializes handler output to ensure JSON compatibility.
Validates that the output can be encoded as JSON. This ensures outputs can be stored in the database and passed to dependent steps.
Parameters
output- Handler output to serialize
Returns
{:ok, output}- Output is JSON-serializable{:error, reason}- Output cannot be serialized
Examples
{:ok, output} = PgFlow.Worker.Executor.serialize_output(%{key: "value"})
#=> {:ok, %{key: "value"}}
{:ok, output} = PgFlow.Worker.Executor.serialize_output([1, 2, 3])
#=> {:ok, [1, 2, 3]}
{:error, msg} = PgFlow.Worker.Executor.serialize_output(%{pid: self()})
#=> {:error, "Output is not JSON-serializable: ..."}