PgFlow.FlowCompiler (PgFlow v0.1.0)

Copy Markdown View Source

Compiles flow definitions into SQL statements.

This module transforms PgFlow.Flow.Definition structs into executable SQL that registers flows and steps in the pgflow database schema. The generated SQL is intended to be run inside Ecto migrations.

Generated SQL

For each flow, the compiler generates:

  1. A SELECT pgflow.create_flow(...) call to create the flow record and PGMQ queue
  2. A SELECT pgflow.add_step(...) call for each step in the flow

Example

definition = MyApp.Flows.ArticleFlow.__pgflow_definition__()
sql_statements = PgFlow.FlowCompiler.compile(definition)

# Returns:
# [
#   "SELECT pgflow.create_flow('article_flow', 3, 5, 120)",
#   "SELECT pgflow.add_step('article_flow', 'fetch_article', ARRAY[]::text[], NULL, NULL, NULL, NULL, 'single')",
#   ...
# ]

Summary

Functions

Generates the SQL to add a step to a flow.

Compiles a flow definition into a list of SQL statements.

Generates the SQL to create a flow.

Generates the SQL to schedule a flow/job with pg_cron.

Generates the SQL to unschedule a flow/job from pg_cron.

Checks if the flow module has a cron expression configured.

Functions

add_step_sql(flow_slug, step)

@spec add_step_sql(atom(), PgFlow.Flow.Step.t()) :: String.t()

Generates the SQL to add a step to a flow.

Parameters

Returns

  • A SQL string for adding the step

compile(definition)

@spec compile(PgFlow.Flow.Definition.t()) :: [String.t()]

Compiles a flow definition into a list of SQL statements.

Returns a list of SQL strings that, when executed in order, will register the flow and all its steps in the database.

Parameters

Returns

  • A list of SQL statement strings

Example

iex> definition = %PgFlow.Flow.Definition{
...>   slug: :test_flow,
...>   module: TestFlow,
...>   opts: [max_attempts: 3, base_delay: 5, timeout: 60],
...>   steps: [
...>     %PgFlow.Flow.Step{slug: :step_a},
...>     %PgFlow.Flow.Step{slug: :step_b, depends_on: [:step_a]}
...>   ]
...> }
iex> PgFlow.FlowCompiler.compile(definition)
[
  "SELECT pgflow.create_flow('test_flow', 3, 5, 60)",
  "SELECT pgflow.add_step('test_flow', 'step_a', ARRAY[]::text[], NULL, NULL, NULL, NULL, 'single')",
  "SELECT pgflow.add_step('test_flow', 'step_b', ARRAY['step_a']::text[], NULL, NULL, NULL, NULL, 'single')"
]

create_flow_sql(definition)

@spec create_flow_sql(PgFlow.Flow.Definition.t()) :: String.t()

Generates the SQL to create a flow.

Parameters

Returns

  • A SQL string for creating the flow

cron_schedule_sql(slug, expression, input)

@spec cron_schedule_sql(atom(), String.t(), map()) :: String.t()

Generates the SQL to schedule a flow/job with pg_cron.

Parameters

  • slug - The flow/job slug atom
  • expression - The cron expression string (e.g., "0 ")
  • input - The input map to pass to the flow/job

Returns

  • A SQL string for scheduling the cron job

cron_unschedule_sql(slug)

@spec cron_unschedule_sql(atom()) :: String.t()

Generates the SQL to unschedule a flow/job from pg_cron.

Parameters

  • slug - The flow/job slug atom

Returns

  • A SQL string for unscheduling the cron job

has_cron?(module)

@spec has_cron?(module()) :: boolean()

Checks if the flow module has a cron expression configured.

Parameters

  • module - The flow module

Returns

  • true if the module has a cron expression, false otherwise