PgFlow.Worker.Executor (PgFlow v0.1.0)

Copy Markdown View Source

Task execution logic for flow steps.

This module handles the execution of step handlers with proper timeout handling, output serialization, and error handling. It manages the complete lifecycle of a task execution:

  1. Building execution context with run metadata
  2. Retrieving the handler function from the flow module
  3. Preparing input data (flow input for root steps, deps for dependent steps)
  4. Executing the handler with timeout protection
  5. Serializing output as JSON-compatible data
  6. Converting errors to serializable format

Handler Signatures

Step handlers receive different inputs based on their dependencies:

  • Root steps (no dependencies): fn input, ctx -> output end

  • Dependent steps: fn deps, ctx -> output end

    • deps is a map of dependency outputs keyed by step slug
    • ctx is the PgFlow.Context struct

Timeout Handling

Task execution is protected by a configurable timeout. If a handler exceeds the timeout, it is terminated and the task is marked as failed.

Output Serialization

Handler outputs must be JSON-serializable. The executor validates and converts outputs to ensure they can be stored in the database.

Usage

task = %{
  run_id: "550e8400-e29b-41d4-a716-446655440000",
  step_slug: :process,
  task_index: 0,
  attempt: 1,
  input: %{"key" => "value"},
  deps: %{}
}

case PgFlow.Worker.Executor.execute(MyFlow, task, MyApp.Repo) do
  {:ok, output} ->
    # Task succeeded, persist output
    PgFlow.Queries.Flows.complete_task(repo, task.run_id, task.step_slug, task.task_index, output)

  {:error, error_message} ->
    # Task failed, persist error
    PgFlow.Queries.Flows.fail_task(repo, task.run_id, task.step_slug, task.task_index, error_message)
end

Summary

Functions

Builds a PgFlow.Context struct for task execution.

Executes a flow step handler.

Executes a handler function with timeout protection.

Retrieves the handler function for a step.

Prepares input data for handler execution.

Converts an error to a serializable string representation.

Serializes handler output to ensure JSON compatibility.

Functions

build_context(task, repo)

@spec build_context(map(), Ecto.Repo.t()) ::
  {:ok, PgFlow.Context.t()} | {:error, term()}

Builds a PgFlow.Context struct for task execution.

The context provides step handlers with metadata about the current execution environment and utilities for accessing flow data.

Parameters

  • task - Task map with execution details
  • repo - The Ecto repository

Returns

  • {:ok, context} - Context struct ready for handler execution
  • {:error, reason} - Error if context creation fails

Examples

task = %{
  run_id: "550e8400-e29b-41d4-a716-446655440000",
  step_slug: :process,
  task_index: 0,
  attempt: 1,
  input: %{},
  deps: %{}
}

{:ok, ctx} = PgFlow.Worker.Executor.build_context(task, MyApp.Repo)
#=> {:ok, %PgFlow.Context{
#=>   run_id: "550e8400-e29b-41d4-a716-446655440000",
#=>   step_slug: :process,
#=>   task_index: 0,
#=>   attempt: 1,
#=>   repo: MyApp.Repo,
#=>   flow_input: :not_loaded
#=> }}

execute(flow_module, task, repo, opts \\ [])

@spec execute(module(), map(), Ecto.Repo.t(), keyword()) ::
  {:ok, map()} | {:error, String.t()}

Executes a flow step handler.

Parameters

  • flow_module - The flow module containing the step handlers
  • task - Task map with execution details
  • repo - The Ecto repository
  • opts - Options keyword list

Options

  • :timeout - Execution timeout in milliseconds (default: 30_000)

Task Structure

The task map must contain:

  • :run_id - Flow run UUID
  • :step_slug - Step identifier atom
  • :task_index - Task index (0 for single steps)
  • :attempt - Current attempt number
  • :input - Task input data map
  • :deps - Dependency outputs map

Returns

  • {:ok, output} - Handler succeeded, output is JSON-serializable
  • {:error, error_message} - Handler failed or timed out

Examples

task = %{
  run_id: "550e8400-e29b-41d4-a716-446655440000",
  step_slug: :fetch_user,
  task_index: 0,
  attempt: 1,
  input: %{"user_id" => 123},
  deps: %{}
}

{:ok, output} = PgFlow.Worker.Executor.execute(MyFlow, task, MyApp.Repo)
#=> {:ok, %{"name" => "John", "email" => "john@example.com"}}

# Handler that times out
{:error, msg} = PgFlow.Worker.Executor.execute(SlowFlow, task, MyApp.Repo, timeout: 100)
#=> {:error, "Task timed out after 100ms"}

execute_with_timeout(handler, input, ctx, timeout)

@spec execute_with_timeout(function(), map(), PgFlow.Context.t(), pos_integer()) ::
  {:ok, term()} | {:error, String.t()}

Executes a handler function with timeout protection.

Spawns the handler in a separate task and awaits the result with the specified timeout. If the timeout is exceeded, the task is killed and an error is returned.

Parameters

  • handler - Handler function to execute
  • input - Input data for the handler
  • ctx - Execution context
  • timeout - Timeout in milliseconds

Returns

  • {:ok, output} - Handler completed successfully
  • {:error, reason} - Handler failed or timed out

Examples

handler = fn input, _ctx -> %{result: input["value"] * 2} end
input = %{"value" => 21}

{:ok, output} = PgFlow.Worker.Executor.execute_with_timeout(handler, input, ctx, 5000)
#=> {:ok, %{result: 42}}

slow_handler = fn _input, _ctx -> Process.sleep(10_000); %{} end
{:error, msg} = PgFlow.Worker.Executor.execute_with_timeout(slow_handler, input, ctx, 100)
#=> {:error, "Task timed out after 100ms"}

get_handler(flow_module, step_slug)

@spec get_handler(module(), atom()) :: {:ok, function()} | {:error, String.t()}

Retrieves the handler function for a step.

Calls the flow module's __pgflow_handler__/1 function to get the handler function for the specified step.

Parameters

  • flow_module - The flow module
  • step_slug - Step identifier atom

Returns

  • {:ok, handler} - Handler function
  • {:error, reason} - Error if handler not found

Examples

{:ok, handler} = PgFlow.Worker.Executor.get_handler(MyFlow, :process)
#=> {:ok, #Function<...>}

{:error, msg} = PgFlow.Worker.Executor.get_handler(MyFlow, :nonexistent)
#=> {:error, "No handler defined for step: :nonexistent"}

prepare_input(map, ctx)

@spec prepare_input(map(), PgFlow.Context.t()) :: {:ok, map()} | {:error, term()}

Prepares input data for handler execution.

For root steps (no dependencies), returns the task input directly. For dependent steps, returns the deps map containing outputs from dependencies.

Parameters

  • task - Task map with input and deps
  • ctx - Execution context (currently unused, for future extensions)

Returns

  • {:ok, handler_input} - Input data for the handler
  • {:error, reason} - Error if input preparation fails

Examples

# Root step - uses input
task = %{input: %{"key" => "value"}, deps: %{}}
{:ok, input} = PgFlow.Worker.Executor.prepare_input(task, ctx)
#=> {:ok, %{"key" => "value"}}

# Dependent step - uses deps
task = %{input: %{}, deps: %{fetch: %{"result" => 123}}}
{:ok, input} = PgFlow.Worker.Executor.prepare_input(task, ctx)
#=> {:ok, %{fetch: %{"result" => 123}}}

serialize_error(error)

@spec serialize_error(term()) :: String.t()

Converts an error to a serializable string representation.

Handles various error types including strings, atoms, exceptions, and complex terms. Ensures errors can be persisted to the database.

Parameters

  • error - Error value to serialize

Returns

A string representation of the error.

Examples

PgFlow.Worker.Executor.serialize_error("Connection failed")
#=> "Connection failed"

PgFlow.Worker.Executor.serialize_error(:timeout)
#=> "timeout"

PgFlow.Worker.Executor.serialize_error(%RuntimeError{message: "Something broke"})
#=> "RuntimeError: Something broke"

PgFlow.Worker.Executor.serialize_error({:error, :not_found})
#=> "{:error, :not_found}"

serialize_output(output)

@spec serialize_output(term()) :: {:ok, map() | list()} | {:error, String.t()}

Serializes handler output to ensure JSON compatibility.

Validates that the output can be encoded as JSON. This ensures outputs can be stored in the database and passed to dependent steps.

Parameters

  • output - Handler output to serialize

Returns

  • {:ok, output} - Output is JSON-serializable
  • {:error, reason} - Output cannot be serialized

Examples

{:ok, output} = PgFlow.Worker.Executor.serialize_output(%{key: "value"})
#=> {:ok, %{key: "value"}}

{:ok, output} = PgFlow.Worker.Executor.serialize_output([1, 2, 3])
#=> {:ok, [1, 2, 3]}

{:error, msg} = PgFlow.Worker.Executor.serialize_output(%{pid: self()})
#=> {:error, "Output is not JSON-serializable: ..."}