# `PgFlow.Client`
[🔗](https://github.com/agoodway/pgflow/blob/v0.1.0/lib/pgflow/client.ex#L1)

Client API for interacting with PgFlow runs.

Provides functions to start flows, query run status, and wait for completion.

## Usage

    # Start a flow asynchronously
    {:ok, run_id} = PgFlow.Client.start_flow(:my_flow, %{"order_id" => 123})

    # Start a flow and wait for completion
    {:ok, run} = PgFlow.Client.start_flow_sync(:my_flow, %{"order_id" => 123}, timeout: 30_000)

    # Get run details
    {:ok, run} = PgFlow.Client.get_run(run_id)

    # Get run with step states preloaded
    {:ok, run} = PgFlow.Client.get_run_with_states(run_id)

# `delete_flow`

```elixir
@spec delete_flow(String.t()) :: :ok | {:error, term()}
```

Deletes a flow and all associated data (runs, tasks, queue).

This permanently removes the flow definition and all historical run data.
Intended for cleaning up retired flow versions.

## Examples

    PgFlow.Client.delete_flow("acct_123_hubspot_sync_v1")
    # => :ok

# `enqueue`

```elixir
@spec enqueue(module(), map()) :: {:ok, String.t()} | {:error, term()}
```

Enqueues a background job immediately.

Jobs are single-step flows, so this delegates to `start_flow/2`.

# `enqueue`

```elixir
@spec enqueue(module(), map(), keyword()) :: {:ok, String.t()} | {:error, term()}
```

Enqueues a background job with options.

Supported options:

  * `:delay_seconds` - non-negative integer seconds before the job is visible
  * `:scheduled_at` - `DateTime` when the job should become visible

# `enqueue_at`

```elixir
@spec enqueue_at(module(), map(), DateTime.t()) ::
  {:ok, String.t()} | {:error, term()}
```

Enqueues a background job that becomes visible at `scheduled_at`.

Timestamps in the past are enqueued for immediate execution.

# `enqueue_in`

```elixir
@spec enqueue_in(module(), map(), non_neg_integer()) ::
  {:ok, String.t()} | {:error, term()}
```

Enqueues a background job that becomes visible after `delay_seconds`.

# `flow_exists?`

```elixir
@spec flow_exists?(String.t()) :: {:ok, boolean()} | {:error, term()}
```

Checks if a flow exists in the database.

## Examples

    PgFlow.Client.flow_exists?("acct_123_hubspot_sync_v1")
    # => {:ok, true}

# `get_run`

```elixir
@spec get_run(String.t()) ::
  {:ok, PgFlow.Schema.Run.t()} | {:error, :not_found} | {:error, term()}
```

Gets a run by ID.

Returns `{:ok, run}` if found, or `{:error, :not_found}` if the run does not exist.

## Examples

    {:ok, run} = PgFlow.Client.get_run(run_id)
    {:error, :not_found} = PgFlow.Client.get_run("00000000-0000-0000-0000-000000000000")

# `get_run_with_states`

```elixir
@spec get_run_with_states(String.t()) ::
  {:ok, PgFlow.Schema.Run.t()} | {:error, :not_found} | {:error, term()}
```

Gets a run with all step states preloaded.

Returns `{:ok, run}` with step_states association loaded, or `{:error, :not_found}`
if the run does not exist.

## Examples

    {:ok, run} = PgFlow.Client.get_run_with_states(run_id)
    Enum.each(run.step_states, fn state ->
      IO.puts("#{state.step_slug}: #{state.status}")
    end)

# `start_flow`

```elixir
@spec start_flow(module() | atom() | String.t(), map()) ::
  {:ok, String.t()} | {:error, term()}
```

Starts a flow run with the given input.

Calls the `pgflow.start_flow` SQL function which handles all initialization:
- Creates run and step_states records
- Handles map step initial_tasks
- Broadcasts run:started event
- Enqueues ready steps to pgmq
- Handles empty cascades

The flow can be specified by module name or slug atom/string.
Returns `{:ok, run_id}` on success or `{:error, reason}` on failure.

## Examples

    {:ok, run_id} = PgFlow.Client.start_flow(:process_order, %{"order_id" => 123})
    {:ok, run_id} = PgFlow.Client.start_flow("process_order", %{"order_id" => 123})
    {:ok, run_id} = PgFlow.Client.start_flow(MyApp.Flows.ProcessOrder, %{"order_id" => 123})

# `start_flow_sync`

```elixir
@spec start_flow_sync(module() | atom() | String.t(), map(), keyword()) ::
  {:ok, PgFlow.Schema.Run.t()}
  | {:error, PgFlow.Schema.Run.t()}
  | {:error, :timeout}
  | {:error, term()}
```

Starts a flow and waits for completion.

Blocks until the flow completes or the timeout is reached. Returns the
completed run on success or error.

## Options

  * `:timeout` - Maximum time to wait in milliseconds (default: 60_000)
  * `:poll_interval` - How often to check status in milliseconds (default: 500)

## Examples

    {:ok, run} = PgFlow.Client.start_flow_sync(:my_flow, %{"order_id" => 123})
    {:error, run} = PgFlow.Client.start_flow_sync(:failing_flow, %{})
    {:error, :timeout} = PgFlow.Client.start_flow_sync(:slow_flow, %{}, timeout: 1000)

# `upsert_flow`

```elixir
@spec upsert_flow(
  String.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}
```

Recompiles a flow definition at runtime.

This is the primary API for runtime flow management. Unlike the compile-time
DSL (`use PgFlow.Flow`), this function creates flow definitions from plain
data - ideal for per-tenant automations where flows are defined dynamically.

If the flow already exists, this operation is destructive: the existing
definition and historical run/task data for the slug are deleted before
recompiling.

## Options

  * `:max_attempts` - Maximum retry attempts (default: 3)
  * `:base_delay` - Base delay between retries in seconds (default: 1)
  * `:timeout` - Step timeout in seconds (default: 60)
  * `:steps` - **Required.** List of step definition maps, each with:
    * `:slug` - Step identifier (required)
    * `:deps` - List of dependency step slugs (default: [])
    * `:step_type` - Step type, e.g. `"single"` (default: `"single"`)
    * `:max_attempts` - Step-level retry override (optional)
    * `:base_delay` - Step-level delay override (optional)
    * `:timeout` - Step-level timeout override (optional)
    * `:start_delay` - Delay before step starts in seconds (optional)

## Examples

    PgFlow.Client.upsert_flow("acct_123_hubspot_sync_v1",
      max_attempts: 3,
      steps: [
        %{slug: "reshape", deps: []},
        %{slug: "create_contact", deps: ["reshape"]}
      ]
    )
    # => {:ok, %{"status" => "compiled", "differences" => []}}

---

*Consult [api-reference.md](api-reference.md) for complete listing*
