Represents a compiled flow definition.
A flow definition contains all the information needed to execute a workflow:
- A unique slug identifier
- The module that defined it
- Flow-level configuration options
- An ordered list of steps with their dependencies
Flow definitions are validated at compile time to ensure:
- No circular dependencies exist
- All step dependencies reference existing steps
- Map steps have at most one dependency
Examples
%PgFlow.Flow.Definition{
slug: :user_onboarding,
module: MyApp.Flows.UserOnboarding,
opts: [max_attempts: 3, base_delay: 1000, timeout: 30_000],
steps: [
%PgFlow.Flow.Step{slug: :create_account},
%PgFlow.Flow.Step{slug: :send_welcome_email, depends_on: [:create_account]}
]
}
Summary
Functions
Converts raw step tuples from the DSL into Step structs.
Converts a flow slug (atom) to a string for database storage.
Validates a flow definition.
Validates that all step dependencies reference existing steps.
Validates that map steps have at most one dependency.
Validates that the flow has no circular dependencies.
Types
@type flow_type() :: :flow | :job
@type t() :: %PgFlow.Flow.Definition{ flow_type: flow_type(), module: module(), opts: keyword(), slug: atom(), steps: [PgFlow.Flow.Step.t()] }
Functions
@spec build_steps([atom() | {atom(), keyword()}]) :: [PgFlow.Flow.Step.t()]
Converts raw step tuples from the DSL into Step structs.
Examples
iex> PgFlow.Flow.Definition.build_steps([:fetch_user, {:send_email, depends_on: [:fetch_user]}])
[
%PgFlow.Flow.Step{slug: :fetch_user, step_type: :single, depends_on: []},
%PgFlow.Flow.Step{slug: :send_email, step_type: :single, depends_on: [:fetch_user]}
]
Converts a flow slug (atom) to a string for database storage.
Examples
iex> PgFlow.Flow.Definition.slug_to_string(:user_onboarding)
"user_onboarding"
iex> PgFlow.Flow.Definition.slug_to_string(:payment_processing)
"payment_processing"
Validates a flow definition.
Performs the following checks:
- No circular dependencies exist in the step graph
- All step dependencies reference existing steps
- Map steps have at most one dependency
Returns {:ok, definition} if valid, or {:error, reason} if invalid.
Examples
iex> definition = %PgFlow.Flow.Definition{
...> slug: :test_flow,
...> module: TestFlow,
...> steps: [
...> %PgFlow.Flow.Step{slug: :step1},
...> %PgFlow.Flow.Step{slug: :step2, depends_on: [:step1]}
...> ]
...> }
iex> PgFlow.Flow.Definition.validate(definition)
{:ok, definition}
iex> definition = %PgFlow.Flow.Definition{
...> slug: :test_flow,
...> module: TestFlow,
...> steps: [
...> %PgFlow.Flow.Step{slug: :step1, depends_on: [:step2]},
...> %PgFlow.Flow.Step{slug: :step2, depends_on: [:step1]}
...> ]
...> }
iex> PgFlow.Flow.Definition.validate(definition)
{:error, "Circular dependency detected in flow :test_flow"}
Validates that all step dependencies reference existing steps.
Examples
iex> definition = %PgFlow.Flow.Definition{
...> slug: :test_flow,
...> module: TestFlow,
...> steps: [
...> %PgFlow.Flow.Step{slug: :step1, depends_on: [:nonexistent]}
...> ]
...> }
iex> PgFlow.Flow.Definition.validate_dependencies_exist(definition)
{:error, "Step :step1 in flow :test_flow depends on non-existent step :nonexistent"}
Validates that map steps have at most one dependency.
Map steps process arrays from a single source, so they can only depend on one step.
Examples
iex> definition = %PgFlow.Flow.Definition{
...> slug: :test_flow,
...> module: TestFlow,
...> steps: [
...> %PgFlow.Flow.Step{
...> slug: :process_items,
...> step_type: :map,
...> depends_on: [:step1, :step2]
...> }
...> ]
...> }
iex> PgFlow.Flow.Definition.validate_map_step_constraints(definition)
{:error, "Map step :process_items in flow :test_flow must have exactly one dependency (the array source), but has 2"}
Validates that the flow has no circular dependencies.
Uses Kahn's algorithm for topological sorting to detect cycles. If a topological sort cannot be completed, a cycle exists.
Examples
iex> definition = %PgFlow.Flow.Definition{
...> slug: :test_flow,
...> module: TestFlow,
...> steps: [
...> %PgFlow.Flow.Step{slug: :a, depends_on: [:b]},
...> %PgFlow.Flow.Step{slug: :b, depends_on: [:a]}
...> ]
...> }
iex> PgFlow.Flow.Definition.validate_no_cycles(definition)
{:error, "Circular dependency detected in flow :test_flow"}