Introduction
View SourceBullMQ is a robust, feature-rich message queue and job scheduling library for Elixir, built on top of Redis. It's a port of the popular BullMQ library from the Node.js ecosystem, providing full compatibility with existing BullMQ queues.
Features
- High Performance: Leverages Redis for fast, reliable message passing
- Job Scheduling: Schedule jobs to run at specific times or intervals
- Priority Queues: Process high-priority jobs first
- Retry Strategies: Automatic retries with configurable backoff
- Rate Limiting: Control job processing rates
- Parent-Child Jobs: Create complex job hierarchies with dependencies
- Real-time Events: Subscribe to job lifecycle events via Worker callbacks or QueueEvents
- Concurrency Control: Process multiple jobs simultaneously
- Stalled Job Recovery: Automatically recover jobs from crashed workers
- Telemetry Integration: Built-in observability with Telemetry
- OTP Design: Built using GenServers, Supervisors, and other OTP patterns
- Node.js Compatibility: Share queues between Elixir and Node.js workers
Quick Start
Add BullMQ to your dependencies:
def deps do
[
{:bullmq, "~> 1.0"}
]
endAdd jobs to a queue:
# Add a job (stateless API)
{:ok, job} = BullMQ.Queue.add("emails", "send-welcome", %{
to: "user@example.com",
template: "welcome"
}, connection: :my_redis)Process jobs with a worker:
defmodule MyApp.EmailWorker do
def process(%BullMQ.Job{name: "send-welcome", data: data}) do
MyApp.Mailer.send_welcome(data["to"], data["template"])
{:ok, %{sent: true}}
end
end
# Start the worker with event callbacks
{:ok, worker} = BullMQ.Worker.start_link(
queue: "emails",
connection: :my_redis,
processor: &MyApp.EmailWorker.process/1,
concurrency: 5,
on_completed: fn job, result ->
Logger.info("Job #{job.id} completed")
end
)Architecture
BullMQ uses Redis data structures to implement a reliable, distributed job queue:
- Lists for FIFO job queues (waiting, active)
- Sorted Sets for priority queues, delayed jobs, and rate limiting
- Hashes for job data storage
- Streams for real-time event delivery
- Lua Scripts for atomic operations
The Elixir port leverages OTP patterns:
- GenServer for stateful components (Worker, QueueEvents)
- Supervisor for fault tolerance
- Telemetry for observability
- True Parallelism using multiple BEAM processes for concurrent job processing
API Design
BullMQ for Elixir provides both stateless and stateful APIs:
Stateless API (Recommended)
Most queue operations work as simple function calls with a connection:
# Add a job
{:ok, job} = BullMQ.Queue.add("my_queue", "job_name", %{data: "value"},
connection: :my_redis)
# Get job counts
{:ok, counts} = BullMQ.Queue.get_counts("my_queue", connection: :my_redis)
# Pause queue
:ok = BullMQ.Queue.pause("my_queue", connection: :my_redis)Stateful API (GenServer)
Workers and QueueEvents run as supervised processes:
# Worker as GenServer
{:ok, worker} = BullMQ.Worker.start_link(
queue: "my_queue",
connection: :my_redis,
processor: &process/1
)
# QueueEvents as GenServer
{:ok, events} = BullMQ.QueueEvents.start_link(
queue: "my_queue",
connection: :my_redis
)Next Steps
- Read the Getting Started guide
- Learn about Workers
- Explore Job Options
- Understand Queue Events
- Set up Job Schedulers
- Configure Rate Limiting
- Add Telemetry