PgFlow.Flow (PgFlow v0.1.0)

Copy Markdown View Source

A macro-based DSL for defining pgflow workflows.

This module provides a declarative way to define workflow steps with dependencies, retries, timeouts, and array processing capabilities. Use it by calling use PgFlow.Flow in your flow module.

Example

defmodule MyApp.Flows.Example do
  use PgFlow.Flow

  @flow queue: :example, max_attempts: 3, base_delay: 5, timeout: 60

  step :first do
    fn input, ctx ->
      %{result: input["value"] * 2}
    end
  end

  step :second, depends_on: [:first] do
    fn deps, ctx ->
      %{doubled: deps.first["result"]}
    end
  end

  map :process_items, array: :second do
    fn item, ctx ->
      %{processed: item}
    end
  end
end

Flow Options

The @flow module attribute accepts the following options:

  • :queue - (required) atom identifier for the flow queue (also accepts :slug as alias)
  • :max_attempts - maximum retry attempts for failed steps (default: 1)
  • :base_delay - base delay in seconds for exponential backoff (default: 1)
  • :timeout - step execution timeout in seconds (default: 30)
  • :cron - (optional) schedule this flow via pg_cron with sub-options:
    • :schedule - (required) cron schedule string (e.g., "@hourly", "0 9 *")
    • :input - (optional) static input map passed to each scheduled run

Step Options

Steps defined with step/2 or step/3 accept these options:

  • :depends_on - list of step atoms this step depends on
  • :handler - module implementing PgFlow.StepHandler (alternative to block)
  • :max_attempts - override flow-level max_attempts
  • :base_delay - override flow-level base_delay
  • :timeout - override flow-level timeout
  • :start_delay - seconds to delay before starting this step

Map Options

Map steps defined with map/2 or map/3 accept step options plus:

  • :array - step slug whose output array to process (for dependent maps)

Generated Functions

Using this module generates the following callback functions:

  • __pgflow_definition__/0 - returns a PgFlow.Flow.Definition struct
  • __pgflow_slug__/0 - returns the flow slug atom
  • __pgflow_steps__/0 - returns the raw step definitions
  • __pgflow_handler__/1 - pattern-matched functions for each step

Summary

Functions

Defines an array processing step that executes a handler for each item.

Defines a single execution step in the workflow.

Functions

map(slug, opts \\ [], list)

(macro)

Defines an array processing step that executes a handler for each item.

Examples

# Map over inline array
map :process_users do
  fn user, ctx ->
    %{processed: process_user(user)}
  end
end

# Map over output from another step
map :enrich_items, array: :fetch_items do
  fn item, ctx ->
    %{enriched: enrich(item)}
  end
end

# Map with module handler
map :validate_each, array: :items, handler: MyApp.ValidateItemHandler

# Map with custom settings
map :slow_processing, array: :items, timeout: 120 do
  fn item, ctx ->
    %{result: slow_process(item)}
  end
end

step(slug, opts \\ [], list)

(macro)

Defines a single execution step in the workflow.

Examples

# Basic step with inline handler
step :fetch_data do
  fn input, ctx ->
    %{data: fetch_from_api(input["url"])}
  end
end

# Step with dependencies
step :transform, depends_on: [:fetch_data] do
  fn deps, ctx ->
    %{transformed: transform(deps.fetch_data["data"])}
  end
end

# Step with module handler
step :validate, handler: MyApp.ValidateHandler

# Step with custom retry settings
step :flaky_operation, max_attempts: 5, base_delay: 10 do
  fn input, ctx ->
    perform_operation()
  end
end