BullMQ.QueueEvents (BullMQ v1.0.1)
View SourceSubscribe to queue events in real-time.
QueueEvents uses Redis Streams to listen for job lifecycle events. This enables reactive patterns and monitoring of job processing.
Usage
# Start the event listener
{:ok, pid} = BullMQ.QueueEvents.start_link(
queue: "my_queue",
connection: :redis
)
# Subscribe to events
BullMQ.QueueEvents.subscribe(pid, self())
# Receive events
receive do
{:bullmq_event, :completed, %{job_id: id, returnvalue: value}} ->
IO.puts("Job #{id} completed with #{value}")
{:bullmq_event, :failed, %{job_id: id, failed_reason: reason}} ->
IO.puts("Job #{id} failed: #{reason}")
endEvents
The following events are emitted:
:added- Job was added to the queue:waiting- Job is waiting to be processed:active- Job started processing:progress- Job progress was updated:completed- Job completed successfully:failed- Job failed:delayed- Job was delayed:stalled- Job was detected as stalled:removed- Job was removed:drained- Queue has no more waiting jobs:paused- Queue was paused:resumed- Queue was resumed
Handler Module
For more structured event handling, implement a handler module:
defmodule MyApp.QueueHandler do
use BullMQ.QueueEvents.Handler
@impl true
def handle_event(:completed, %{job_id: id, returnvalue: value}, state) do
Logger.info("Job #{id} completed")
{:ok, state}
end
@impl true
def handle_event(:failed, %{job_id: id, failed_reason: reason}, state) do
Logger.error("Job #{id} failed: #{reason}")
{:ok, state}
end
@impl true
def handle_event(_event, _data, state) do
{:ok, state}
end
end
# Use the handler
BullMQ.QueueEvents.start_link(
queue: "my_queue",
connection: :redis,
handler: MyApp.QueueHandler
)
Summary
Functions
Returns a specification to start this module under a supervisor.
Closes the event listener.
Starts the queue events listener.
Subscribes a process to receive queue events.
Unsubscribes a process from queue events.
Types
@type event() :: BullMQ.Types.queue_event()
@type event_data() :: map()
@type t() :: %BullMQ.QueueEvents{ blocking_conn: pid() | nil, closing: boolean(), connection: BullMQ.Types.redis_connection(), consumer_task: reference() | nil, handler: module() | nil, handler_state: term(), keys: BullMQ.Keys.queue_context(), last_event_id: String.t(), prefix: String.t(), queue_name: String.t(), running: boolean(), subscribers: [pid()] }
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec close(GenServer.server()) :: :ok
Closes the event listener.
@spec start_link(keyword()) :: GenServer.on_start()
Starts the queue events listener.
Options
:queue- Queue name (required):connection- Redis connection (required):prefix- Queue prefix (default: "bull"):handler- Handler module (optional):handler_state- Initial handler state (optional):last_event_id- Start from specific event ID (default: "$" for new events):autorun- Start listening immediately (default: true)
@spec subscribe(GenServer.server(), pid()) :: :ok
Subscribes a process to receive queue events.
Events are sent as {:bullmq_event, event_type, event_data} messages.
@spec unsubscribe(GenServer.server(), pid()) :: :ok
Unsubscribes a process from queue events.