BullMQ.FlowProducer (BullMQ v1.0.1)

View Source

Create job flows with parent-child dependencies.

The FlowProducer allows you to create complex job hierarchies where parent jobs wait for their children to complete before being processed.

All flow operations are atomic - either all jobs in a flow are added, or none are. This is achieved using Redis MULTI/EXEC transactions.

Usage

# Create a flow with parent and children
{:ok, flow} = BullMQ.FlowProducer.add(%{
  name: "process_order",
  queue_name: "orders",
  data: %{order_id: 123},
  children: [
    %{name: "validate", queue_name: "validation", data: %{order_id: 123}},
    %{name: "check_inventory", queue_name: "inventory", data: %{order_id: 123}},
    %{name: "process_payment", queue_name: "payments", data: %{order_id: 123}}
  ]
}, connection: :redis)

# The parent job will be processed only after all children complete

Flow Structure

A flow is defined as a tree of jobs:

%{
  name: "parent_job",
  queue_name: "main_queue",
  data: %{...},
  opts: %{...},
  children: [
    %{name: "child1", queue_name: "queue1", data: %{...}},
    %{
      name: "child2",
      queue_name: "queue2",
      data: %{...},
      children: [
        %{name: "grandchild", queue_name: "queue3", data: %{...}}
      ]
    }
  ]
}

Accessing Child Results

When a parent job is processed, use methods on BullMQ.Job to access children:

def process(%Job{} = job) do
  # Get children results
  {:ok, children_values} = BullMQ.Job.get_children_values(job)

  # Get ignored failures (when using ignore_dependency_on_failure)
  {:ok, ignored} = BullMQ.Job.get_ignored_children_failures(job)

  # Get pending dependencies
  {:ok, deps} = BullMQ.Job.get_dependencies(job)

  {:ok, process_with_children(job.data, children_values)}
end

Failure Handling

By default, if a child fails, the parent will also fail. You can control this behavior with options:

  • :fail_parent_on_failure - If false, parent continues even if children fail
  • :ignore_dependency_on_failure - Ignore failed child and continue

Summary

Functions

Adds a job flow to the queue atomically.

Adds multiple job flows atomically.

Types

flow_node()

@type flow_node() :: %{
  :name => String.t(),
  :queue_name => String.t(),
  optional(:data) => term(),
  optional(:opts) => map(),
  optional(:children) => [flow_node()]
}

flow_result()

@type flow_result() :: %{job: BullMQ.Job.t(), children: [flow_result()]}

Functions

add(flow, opts \\ [])

@spec add(
  flow_node(),
  keyword()
) :: {:ok, flow_result()} | {:error, term()}

Adds a job flow to the queue atomically.

Creates the entire job hierarchy, with children being processed before their parent. The entire flow is added atomically using Redis MULTI/EXEC - either all jobs are added or none are.

Parameters

  • flow - Flow definition (see module docs)
  • opts - Connection and other options

Options

  • :connection - Redis connection (required)
  • :prefix - Queue prefix (default: "bull")

Returns

Returns {:ok, flow_result} with the created job hierarchy.

add_bulk(flows, opts \\ [])

@spec add_bulk(
  [flow_node()],
  keyword()
) :: {:ok, [flow_result()]} | {:error, term()}

Adds multiple job flows atomically.

All flows are added in a single transaction - either all succeed or none do. This matches the Node.js FlowProducer.addBulk behavior.

Examples

flows = [
  %{name: "job1", queue_name: "q1", data: %{}, children: [...]},
  %{name: "job2", queue_name: "q2", data: %{}, children: [...]}
]

{:ok, results} = BullMQ.FlowProducer.add_bulk(flows, connection: :redis)