PgFlow.Config (PgFlow v0.1.0)

Copy Markdown View Source

Configuration validation and management for PgFlow.

Uses NimbleOptions to validate configuration options passed to PgFlow.start_link/1.

Options

  • :repo (required) - The Ecto repository module to use for database operations.
  • :flows (optional) - List of flow modules to automatically register on startup. Default: [].
  • :jobs (optional) - List of job modules to automatically register on startup. Default: [].
  • :worker_name (optional) - Human-readable name for workers in logs. Default: auto-generated from flow slug.
  • :max_concurrency (optional) - Maximum number of parallel tasks per worker. Default: 10.
  • :batch_size (optional) - Number of messages to fetch per poll cycle. Default: 10.
  • :signal_strategy (optional) - Signal detection strategy. :polling (adaptive backoff) or :notify (LISTEN/NOTIFY). Default: :polling.
  • :min_poll_interval (optional) - Minimum milliseconds between polls (fastest rate). Default: 1000. Max: 300_000 (5 minutes).
  • :max_poll_interval (optional) - Maximum milliseconds between polls (slowest rate during backoff). Default: 5000. Max: 300_000 (5 minutes).
  • :notify_fallback_interval (optional) - Milliseconds between fallback polls when using :notify strategy. Default: 30000. Max: 600_000 (10 minutes).
  • :notify_throttle_ms (optional) - Throttle interval for pgmq LISTEN/NOTIFY trigger. Default: 250.
  • :recovery_interval (optional) - Milliseconds between stalled task recovery sweeps. Default: 15_000.
  • :stale_threshold (optional) - Seconds after which a started task is considered stalled. Default: 60.
  • :attach_default_logger (optional) - Whether to attach the default telemetry logger. Default: false.
  • :pubsub (optional) - Phoenix.PubSub module for broadcasting telemetry events to LiveViews. Default: nil (disabled).

Interval Constraints

  • min_poll_interval must be <= max_poll_interval
  • max_poll_interval must not exceed 5 minutes (300,000 ms)
  • notify_fallback_interval must not exceed 10 minutes (600,000 ms)

Examples

config = PgFlow.Config.validate!(
  repo: MyApp.Repo,
  flows: [MyApp.Flows.ProcessOrder],
  max_concurrency: 20,
  signal_strategy: :notify
)

Summary

Functions

Returns the NimbleOptions schema for PgFlow configuration.

Validates the given configuration options.

Functions

schema()

@spec schema() :: keyword()

Returns the NimbleOptions schema for PgFlow configuration.

validate!(opts)

@spec validate!(keyword()) :: keyword()

Validates the given configuration options.

Raises ArgumentError if the configuration is invalid.

Examples

iex> PgFlow.Config.validate!(repo: MyApp.Repo)
[repo: MyApp.Repo, flows: [], max_concurrency: 10, ...]

iex> PgFlow.Config.validate!(flows: [MyFlow])
** (ArgumentError) required :repo option not found