BullMQ.QueueEvents (BullMQ v1.0.1)

View Source

Subscribe to queue events in real-time.

QueueEvents uses Redis Streams to listen for job lifecycle events. This enables reactive patterns and monitoring of job processing.

Usage

# Start the event listener
{:ok, pid} = BullMQ.QueueEvents.start_link(
  queue: "my_queue",
  connection: :redis
)

# Subscribe to events
BullMQ.QueueEvents.subscribe(pid, self())

# Receive events
receive do
  {:bullmq_event, :completed, %{job_id: id, returnvalue: value}} ->
    IO.puts("Job #{id} completed with #{value}")

  {:bullmq_event, :failed, %{job_id: id, failed_reason: reason}} ->
    IO.puts("Job #{id} failed: #{reason}")
end

Events

The following events are emitted:

  • :added - Job was added to the queue
  • :waiting - Job is waiting to be processed
  • :active - Job started processing
  • :progress - Job progress was updated
  • :completed - Job completed successfully
  • :failed - Job failed
  • :delayed - Job was delayed
  • :stalled - Job was detected as stalled
  • :removed - Job was removed
  • :drained - Queue has no more waiting jobs
  • :paused - Queue was paused
  • :resumed - Queue was resumed

Handler Module

For more structured event handling, implement a handler module:

defmodule MyApp.QueueHandler do
  use BullMQ.QueueEvents.Handler

  @impl true
  def handle_event(:completed, %{job_id: id, returnvalue: value}, state) do
    Logger.info("Job #{id} completed")
    {:ok, state}
  end

  @impl true
  def handle_event(:failed, %{job_id: id, failed_reason: reason}, state) do
    Logger.error("Job #{id} failed: #{reason}")
    {:ok, state}
  end

  @impl true
  def handle_event(_event, _data, state) do
    {:ok, state}
  end
end

# Use the handler
BullMQ.QueueEvents.start_link(
  queue: "my_queue",
  connection: :redis,
  handler: MyApp.QueueHandler
)

Summary

Functions

Returns a specification to start this module under a supervisor.

Closes the event listener.

Starts the queue events listener.

Subscribes a process to receive queue events.

Unsubscribes a process from queue events.

Types

event()

@type event() :: BullMQ.Types.queue_event()

event_data()

@type event_data() :: map()

t()

@type t() :: %BullMQ.QueueEvents{
  blocking_conn: pid() | nil,
  closing: boolean(),
  connection: BullMQ.Types.redis_connection(),
  consumer_task: reference() | nil,
  handler: module() | nil,
  handler_state: term(),
  keys: BullMQ.Keys.queue_context(),
  last_event_id: String.t(),
  prefix: String.t(),
  queue_name: String.t(),
  running: boolean(),
  subscribers: [pid()]
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

close(server)

@spec close(GenServer.server()) :: :ok

Closes the event listener.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts the queue events listener.

Options

  • :queue - Queue name (required)
  • :connection - Redis connection (required)
  • :prefix - Queue prefix (default: "bull")
  • :handler - Handler module (optional)
  • :handler_state - Initial handler state (optional)
  • :last_event_id - Start from specific event ID (default: "$" for new events)
  • :autorun - Start listening immediately (default: true)

subscribe(server, pid \\ self())

@spec subscribe(GenServer.server(), pid()) :: :ok

Subscribes a process to receive queue events.

Events are sent as {:bullmq_event, event_type, event_data} messages.

unsubscribe(server, pid \\ self())

@spec unsubscribe(GenServer.server(), pid()) :: :ok

Unsubscribes a process from queue events.