# `PgFlow.Queries.Flows`
[🔗](https://github.com/agoodway/pgflow/blob/v0.1.0/lib/pgflow/queries/flows.ex#L1)

SQL query interface for pgflow flow operations.

Provides functions for starting flows, completing/failing tasks, reading
messages, and managing flow lifecycle. All functions that accept JSON data
expect Elixir terms that will be encoded with Jason.

# `prune_result`

```elixir
@type prune_result() :: %{
  deleted_runs: non_neg_integer(),
  deleted_step_states: non_neg_integer(),
  deleted_step_tasks: non_neg_integer(),
  deleted_workers: non_neg_integer()
}
```

Result of pruning old run data.

# `compile_flow`

```elixir
@spec compile_flow(Ecto.Repo.t(), String.t(), map(), [map()]) ::
  {:ok, term()} | {:error, term()}
```

Compiles and upserts a flow definition.

## Parameters

  * `repo` - The Ecto repository
  * `slug` - The flow identifier slug
  * `opts` - Flow options map (e.g., `%{max_retries: 3}`)
  * `steps` - List of step definitions as maps

# `complete_task`

```elixir
@spec complete_task(
  Ecto.Repo.t(),
  String.t(),
  String.t(),
  non_neg_integer(),
  map() | list()
) ::
  {:ok, term()} | {:error, term()}
```

Marks a task as completed with output data.

## Parameters

  * `repo` - The Ecto repository
  * `run_id` - The flow run UUID
  * `step_slug` - The step identifier slug
  * `task_index` - The task index (0-based)
  * `output` - Output data as an Elixir term (will be encoded as JSONB)

## Returns

  * `{:ok, result}` - Success result from the database
  * `{:error, reason}` - Error details if the operation fails

# `delay_run`

```elixir
@spec delay_run(Ecto.Repo.t(), String.t(), String.t(), non_neg_integer()) ::
  :ok | {:error, term()}
```

Delays the first queued task for a flow run by moving its pgmq visibility time.

This is a lower-level helper for public APIs such as `PgFlow.enqueue_in/3`
and `PgFlow.enqueue_at/3`. Call it in the same repository transaction as
`start_flow/3` when callers need to ensure workers cannot see the task before
the delay is applied.

# `delete_flow`

```elixir
@spec delete_flow(Ecto.Repo.t(), String.t()) :: :ok | {:error, term()}
```

Deletes a flow and all associated data (runs, step states, step tasks, queue).

## Parameters

  * `repo` - The Ecto repository
  * `slug` - The flow identifier slug

## Returns

  * `:ok` on success (including when flow doesn't exist)
  * `{:error, term()}` on failure

# `delete_message`

```elixir
@spec delete_message(Ecto.Repo.t(), String.t(), pos_integer()) ::
  {:ok, boolean()} | {:error, term()}
```

Deletes a message from a PGMQ queue.

# `fail_task`

```elixir
@spec fail_task(Ecto.Repo.t(), String.t(), String.t(), non_neg_integer(), String.t()) ::
  {:ok, term()} | {:error, term()}
```

Marks a task as failed with an error message.

## Parameters

  * `repo` - The Ecto repository
  * `run_id` - The flow run UUID
  * `step_slug` - The step identifier slug
  * `task_index` - The task index (0-based)
  * `error_message` - Error description string

## Returns

  * `{:ok, result}` - Success result from the database
  * `{:error, reason}` - Error details if the operation fails

# `flow_exists?`

```elixir
@spec flow_exists?(Ecto.Repo.t(), String.t()) :: {:ok, boolean()} | {:error, term()}
```

Checks if a flow exists in the database.

# `get_flow_input`

```elixir
@spec get_flow_input(Ecto.Repo.t(), String.t()) ::
  {:ok, map() | list()} | {:error, term()}
```

Retrieves the input data for a flow run.

# `get_run`

```elixir
@spec get_run(Ecto.Repo.t(), String.t()) ::
  {:ok, %{status: String.t(), output: term()}} | {:error, :not_found | term()}
```

Retrieves a flow run's current state.

## Parameters

  * `repo` - The Ecto repository
  * `run_id` - The flow run UUID

## Returns

  * `{:ok, %{status: String.t(), output: term()}}` - Run state
  * `{:error, :not_found}` - Run does not exist
  * `{:error, reason}` - Error details if the operation fails

# `get_step_output`

```elixir
@spec get_step_output(Ecto.Repo.t(), String.t(), String.t()) ::
  {:ok, map() | nil} | {:error, term()}
```

Retrieves the output for a specific step in a flow run.

# `prune_data`

```elixir
@spec prune_data(Ecto.Repo.t(), pos_integer(), keyword()) ::
  {:ok, prune_result()} | {:error, term()}
```

Prunes old flow run data older than the specified retention period.

## Options

  * `:flow_slugs` - List of flow slugs to prune (default: all flows)

# `read`

```elixir
@spec read(Ecto.Repo.t(), String.t(), pos_integer(), pos_integer()) ::
  {:ok, [list()]} | {:error, term()}
```

Reads messages from a queue without blocking (non-blocking read).

Uses pgmq.read() to fetch available messages. Returns immediately whether or
not messages are available. Messages are made invisible for the visibility
timeout period to prevent duplicate processing.

Queue poll SQL logging is disabled by default because workers call this
frequently. Set `config :pgflow, :log_queue_polls, true` to enable Ecto query
logging for these reads while debugging.

## Parameters

  * `repo` - The Ecto repository
  * `queue_name` - The name of the queue to read from (matches flow_slug)
  * `visibility_timeout` - Time in seconds messages remain invisible
  * `batch_size` - Maximum number of messages to retrieve

## Returns

  * `{:ok, messages}` - List of message records from pgmq (may be empty)
  * `{:error, reason}` - Error details if the operation fails

# `recover_stalled_tasks`

```elixir
@spec recover_stalled_tasks(Ecto.Repo.t(), pos_integer()) ::
  {:ok, non_neg_integer()} | {:error, term()}
```

Recovers stalled tasks by resetting step_tasks stuck in 'started' status.

Tasks that have been in 'started' status longer than the stale threshold
are reset to 'queued' so they can be re-processed by workers.

# `start_flow`

```elixir
@spec start_flow(Ecto.Repo.t(), String.t(), map() | list()) ::
  {:ok, String.t()} | {:error, term()}
```

Starts a new flow execution.

## Parameters

  * `repo` - The Ecto repository
  * `flow_slug` - The flow identifier slug
  * `input` - Input data as an Elixir term (will be encoded as JSONB)

## Returns

  * `{:ok, run_id}` - The UUID of the created flow run
  * `{:error, reason}` - Error details if the operation fails

# `start_tasks`

```elixir
@spec start_tasks(Ecto.Repo.t(), String.t(), [pos_integer()], String.t()) ::
  {:ok, [list()]} | {:error, term()}
```

Starts multiple tasks by marking messages as in-progress.

## Parameters

  * `repo` - The Ecto repository
  * `flow_slug` - The flow identifier slug
  * `msg_ids` - List of message IDs from pgmq
  * `worker_id` - The worker UUID string

## Returns

  * `{:ok, task_details}` - List of task detail records
  * `{:error, reason}` - Error details if the operation fails

# `upsert_flow`

```elixir
@spec upsert_flow(Ecto.Repo.t(), String.t(), map(), [map()]) ::
  {:ok, map()} | {:error, term()}
```

Recompiles a flow definition from runtime options.

Uses `create_flow` + `add_step` (the proven low-level SQL functions) to
register a flow. If the flow already exists, it is dropped and re-created
to ensure the definition matches.

This operation is destructive for existing flows: all historical run and
task data for the slug is deleted before recompiling.

## Parameters

  * `repo` - The Ecto repository
  * `slug` - The flow identifier slug
  * `opts` - Flow-level options map with keys: `"max_attempts"`, `"base_delay"`, `"timeout"`
  * `steps` - List of step definition maps with keys: `"slug"`, `"deps"`, `"step_type"`,
    and optional `"max_attempts"`, `"base_delay"`, `"timeout"`, `"start_delay"`

## Returns

  * `{:ok, %{"status" => status}}` where status is `"compiled"` or `"recompiled"`
  * `{:error, term()}` on failure

# `valid_slug?`

```elixir
@spec valid_slug?(Ecto.Repo.t(), String.t()) :: {:ok, boolean()} | {:error, term()}
```

Checks if a slug is valid according to core pgflow rules.

---

*Consult [api-reference.md](api-reference.md) for complete listing*
