# `PgFlow`
[🔗](https://github.com/agoodway/pgflow/blob/v0.1.0/lib/pgflow.ex#L1)

PgFlow is an Elixir implementation of the pgflow workflow engine.

It provides a macro-based DSL for defining workflow DAGs that execute
on PostgreSQL using pgmq for task coordination.

## Compatibility

This Elixir implementation is compatible with pgflow core version 0.5.0.
It uses the same database schema and SQL functions as the TypeScript/Deno implementation,
allowing both to run side-by-side against the same database.

## Quick Start

    defmodule MyApp.Flows.ProcessOrder do
      use PgFlow.Flow

      @flow slug: :process_order, max_attempts: 3

      step :validate do
        fn input, _ctx ->
          # Validate order data
          %{valid: true, order_id: input["order_id"]}
        end
      end

      step :charge, depends_on: [:validate] do
        fn deps, _ctx ->
          # Charge the customer
          %{charged: true, amount: 100}
        end
      end

      step :fulfill, depends_on: [:charge] do
        fn deps, _ctx ->
          # Fulfill the order
          %{fulfilled: true}
        end
      end
    end

## Starting a Flow

    {:ok, run_id} = PgFlow.start_flow(MyApp.Flows.ProcessOrder, %{"order_id" => 123})

## Configuration

Add PgFlow to your supervision tree:

    children = [
      {PgFlow, repo: MyApp.Repo, flows: [MyApp.Flows.ProcessOrder]}
    ]

See `PgFlow.Config` for configuration options.

# `child_spec`

```elixir
@spec child_spec(keyword()) :: Supervisor.child_spec()
```

Returns a child specification for starting PgFlow under a supervisor.

## Options

See `PgFlow.Config` for available options.

# `core_version`

```elixir
@spec core_version() :: String.t()
```

Returns the compatible pgflow core version.

This version indicates which pgflow database schema and SQL functions
this Elixir implementation is compatible with.

## Examples

    PgFlow.core_version()
    #=> "0.5.0"

# `delete_flow`

```elixir
@spec delete_flow(String.t()) :: :ok | {:error, term()}
```

Deletes a flow and all associated data (runs, tasks, queue).

## Examples

    PgFlow.delete_flow("acct_123_hubspot_sync_v1")

# `enqueue`

```elixir
@spec enqueue(module(), map()) :: {:ok, String.t()} | {:error, term()}
```

Enqueues a background job with the given input.

This is the primary API for dispatching jobs. Under the hood, jobs are
single-step flows, so this delegates to `start_flow/2`.

## Examples

    {:ok, run_id} = PgFlow.enqueue(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"})

# `enqueue`

```elixir
@spec enqueue(module(), map(), keyword()) :: {:ok, String.t()} | {:error, term()}
```

Enqueues a background job with options.

Supported options:

  * `:delay_seconds` - non-negative integer seconds before the job is available
  * `:scheduled_at` - `DateTime` when the job should become available

## Examples

    {:ok, run_id} = PgFlow.enqueue(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, delay_seconds: 60)
    {:ok, run_id} = PgFlow.enqueue(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, scheduled_at: ~U[2026-05-08 12:00:00Z])

# `enqueue_at`

```elixir
@spec enqueue_at(module(), map(), DateTime.t()) ::
  {:ok, String.t()} | {:error, term()}
```

Enqueues a background job that becomes available at `scheduled_at`.

Timestamps in the past enqueue the job for immediate execution. Any `DateTime`
time zone is accepted; PgFlow compares the scheduled timestamp as an instant.

## Examples

    {:ok, run_id} = PgFlow.enqueue_at(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, ~U[2026-05-08 12:00:00Z])

# `enqueue_in`

```elixir
@spec enqueue_in(module(), map(), non_neg_integer()) ::
  {:ok, String.t()} | {:error, term()}
```

Enqueues a background job that becomes available after `delay_seconds`.

PgFlow persists the run immediately, then delays the initial pgmq task
visibility so workers cannot execute it until the delay elapses. Values of
`0` enqueue the job for immediate execution.

## Examples

    {:ok, run_id} = PgFlow.enqueue_in(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, 60)

# `flow_exists?`

```elixir
@spec flow_exists?(String.t()) :: {:ok, boolean()} | {:error, term()}
```

Checks if a flow exists in the database.

## Examples

    PgFlow.flow_exists?("my_flow")
    # => {:ok, true}

# `get_flow`

```elixir
@spec get_flow(module() | atom()) :: {:ok, map()} | {:error, :not_found}
```

Gets the definition for a flow by module or slug.

## Examples

    {:ok, flow_def} = PgFlow.get_flow(MyApp.Flows.ProcessOrder)
    {:ok, flow_def} = PgFlow.get_flow(:process_order)
    {:error, :not_found} = PgFlow.get_flow(:unknown)

# `get_run`

```elixir
@spec get_run(String.t()) :: {:ok, PgFlow.Schema.Run.t()} | {:error, :not_found}
```

Gets a run by ID.

## Examples

    {:ok, run} = PgFlow.get_run("550e8400-e29b-41d4-a716-446655440000")
    {:error, :not_found} = PgFlow.get_run("nonexistent-id")

# `get_run_with_states`

```elixir
@spec get_run_with_states(String.t()) ::
  {:ok, PgFlow.Schema.Run.t()} | {:error, :not_found}
```

Gets a run with all step states preloaded.

## Examples

    {:ok, run} = PgFlow.get_run_with_states("550e8400-e29b-41d4-a716-446655440000")
    run.step_states  # => [%StepState{}, ...]

# `health_check`

```elixir
@spec health_check() :: %{status: :ok, workers: [map()], flows: [map()]}
```

Returns health check information.

## Examples

    PgFlow.health_check()
    #=> %{status: :ok, workers: [...], flows: [...]}

# `list_flows`

```elixir
@spec list_flows() :: [map()]
```

Lists all registered flows.

## Examples

    flows = PgFlow.list_flows()
    #=> [%{module: MyApp.Flows.ProcessOrder, slug: :process_order, ...}, ...]

# `start_flow`

```elixir
@spec start_flow(module() | atom() | String.t(), map()) ::
  {:ok, String.t()} | {:error, term()}
```

Starts a flow run with the given input.

The flow can be specified by module name, slug atom, or slug string.

## Examples

    {:ok, run_id} = PgFlow.start_flow(MyApp.Flows.ProcessOrder, %{"order_id" => 123})
    {:ok, run_id} = PgFlow.start_flow(:process_order, %{"order_id" => 123})

# `start_flow_sync`

```elixir
@spec start_flow_sync(module() | atom() | String.t(), map(), keyword()) ::
  {:ok, PgFlow.Schema.Run.t()}
  | {:error, PgFlow.Schema.Run.t()}
  | {:error, :timeout}
  | {:error, term()}
```

Starts a flow and waits for completion.

Blocks until the flow completes or the timeout is reached.

## Options

  * `:timeout` - Maximum time to wait in milliseconds (default: 60_000)
  * `:poll_interval` - How often to check status in milliseconds (default: 500)

## Examples

    {:ok, run} = PgFlow.start_flow_sync(MyApp.Flows.ProcessOrder, %{"order_id" => 123})
    {:error, run} = PgFlow.start_flow_sync(MyApp.Flows.FailingFlow, %{})

# `start_link`

```elixir
@spec start_link(keyword()) :: Supervisor.on_start()
```

Starts the PgFlow supervision tree.

## Options

See `PgFlow.Config` for available options.

## Examples

    PgFlow.start_link(repo: MyApp.Repo, flows: [MyApp.Flows.ProcessOrder])

# `start_worker`

```elixir
@spec start_worker(
  module(),
  keyword()
) :: {:ok, pid()} | {:error, term()}
```

Starts a worker for the given flow.

## Options

  * `:poll_interval` - How often to poll for messages (default: 1000ms)
  * `:visibility_timeout` - How long to hold messages (default: 30s)

## Examples

    {:ok, pid} = PgFlow.start_worker(MyApp.Flows.ProcessOrder)

# `stop_worker`

```elixir
@spec stop_worker(module()) :: :ok | {:error, :not_found}
```

Stops a worker for the given flow.

## Examples

    :ok = PgFlow.stop_worker(MyApp.Flows.ProcessOrder)

# `upsert_flow`

```elixir
@spec upsert_flow(
  String.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}
```

Recompiles a flow definition at runtime.

Unlike the compile-time DSL (`use PgFlow.Flow`), this creates flow
definitions from plain data - for per-tenant automations and dynamic workflows.

If the flow already exists, this operation is destructive: existing
definition and historical run/task data for the slug are deleted first.

## Examples

     PgFlow.upsert_flow("acct_123_hubspot_sync_v1",
       max_attempts: 3,
       steps: [
         %{slug: "reshape", deps: []},
         %{slug: "create_contact", deps: ["reshape"]}
       ]
     )

---

*Consult [api-reference.md](api-reference.md) for complete listing*
