BullMQ.FlowProducer (BullMQ v1.0.1)
View SourceCreate job flows with parent-child dependencies.
The FlowProducer allows you to create complex job hierarchies where parent jobs wait for their children to complete before being processed.
All flow operations are atomic - either all jobs in a flow are added, or none are. This is achieved using Redis MULTI/EXEC transactions.
Usage
# Create a flow with parent and children
{:ok, flow} = BullMQ.FlowProducer.add(%{
name: "process_order",
queue_name: "orders",
data: %{order_id: 123},
children: [
%{name: "validate", queue_name: "validation", data: %{order_id: 123}},
%{name: "check_inventory", queue_name: "inventory", data: %{order_id: 123}},
%{name: "process_payment", queue_name: "payments", data: %{order_id: 123}}
]
}, connection: :redis)
# The parent job will be processed only after all children completeFlow Structure
A flow is defined as a tree of jobs:
%{
name: "parent_job",
queue_name: "main_queue",
data: %{...},
opts: %{...},
children: [
%{name: "child1", queue_name: "queue1", data: %{...}},
%{
name: "child2",
queue_name: "queue2",
data: %{...},
children: [
%{name: "grandchild", queue_name: "queue3", data: %{...}}
]
}
]
}Accessing Child Results
When a parent job is processed, use methods on BullMQ.Job to access children:
def process(%Job{} = job) do
# Get children results
{:ok, children_values} = BullMQ.Job.get_children_values(job)
# Get ignored failures (when using ignore_dependency_on_failure)
{:ok, ignored} = BullMQ.Job.get_ignored_children_failures(job)
# Get pending dependencies
{:ok, deps} = BullMQ.Job.get_dependencies(job)
{:ok, process_with_children(job.data, children_values)}
endFailure Handling
By default, if a child fails, the parent will also fail. You can control this behavior with options:
:fail_parent_on_failure- If false, parent continues even if children fail:ignore_dependency_on_failure- Ignore failed child and continue
Summary
Types
@type flow_result() :: %{job: BullMQ.Job.t(), children: [flow_result()]}
Functions
@spec add( flow_node(), keyword() ) :: {:ok, flow_result()} | {:error, term()}
Adds a job flow to the queue atomically.
Creates the entire job hierarchy, with children being processed before their parent. The entire flow is added atomically using Redis MULTI/EXEC - either all jobs are added or none are.
Parameters
flow- Flow definition (see module docs)opts- Connection and other options
Options
:connection- Redis connection (required):prefix- Queue prefix (default: "bull")
Returns
Returns {:ok, flow_result} with the created job hierarchy.
@spec add_bulk( [flow_node()], keyword() ) :: {:ok, [flow_result()]} | {:error, term()}
Adds multiple job flows atomically.
All flows are added in a single transaction - either all succeed or none do. This matches the Node.js FlowProducer.addBulk behavior.
Examples
flows = [
%{name: "job1", queue_name: "q1", data: %{}, children: [...]},
%{name: "job2", queue_name: "q2", data: %{}, children: [...]}
]
{:ok, results} = BullMQ.FlowProducer.add_bulk(flows, connection: :redis)