BullMQ.Telemetry (BullMQ v1.0.1)

View Source

Telemetry integration for BullMQ.

BullMQ emits various telemetry events that you can attach handlers to for metrics, logging, and monitoring.

Event Names

All events are prefixed with [:bullmq, ...].

Job Events

  • [:bullmq, :job, :add] - A job was added to the queue

    • Measurements: %{queue_time: native_time} (time to add to Redis)
    • Metadata: %{queue: name, job_id: id, job_name: name}
  • [:bullmq, :job, :start] - A job started processing

    • Measurements: %{system_time: native_time}
    • Metadata: %{queue: name, job_id: id, job_name: name, worker: pid}
  • [:bullmq, :job, :complete] - A job completed successfully

    • Measurements: %{duration: native_time}
    • Metadata: %{queue: name, job_id: id, job_name: name, worker: pid}
  • [:bullmq, :job, :fail] - A job failed

    • Measurements: %{duration: native_time}
    • Metadata: %{queue: name, job_id: id, job_name: name, worker: pid, error: term}
  • [:bullmq, :job, :retry] - A job is being retried

    • Measurements: %{attempt: integer, delay: ms}
    • Metadata: %{queue: name, job_id: id, job_name: name}
  • [:bullmq, :job, :progress] - Job progress updated

    • Measurements: %{progress: 0..100}
    • Metadata: %{queue: name, job_id: id}

Worker Events

  • [:bullmq, :worker, :start] - Worker started

    • Measurements: %{concurrency: integer}
    • Metadata: %{queue: name, worker: pid}
  • [:bullmq, :worker, :stop] - Worker stopped

    • Measurements: %{uptime: native_time}
    • Metadata: %{queue: name, worker: pid}
  • [:bullmq, :worker, :stalled_check] - Stalled job check executed

    • Measurements: %{recovered: integer, failed: integer}
    • Metadata: %{queue: name}

Queue Events

  • [:bullmq, :queue, :pause] - Queue was paused
  • [:bullmq, :queue, :resume] - Queue was resumed
  • [:bullmq, :queue, :drain] - Queue was drained

Rate Limiting Events

  • [:bullmq, :rate_limit, :hit] - Rate limit was hit
    • Measurements: %{delay: ms}
    • Metadata: %{queue: name}

Example Setup

# In your application.ex
:telemetry.attach_many(
  "bullmq-logger",
  [
    [:bullmq, :job, :complete],
    [:bullmq, :job, :fail],
    [:bullmq, :rate_limit, :hit]
  ],
  &MyApp.Telemetry.handle_event/4,
  nil
)

Prometheus/StatsD Integration

defmodule MyApp.Telemetry do
  def handle_event([:bullmq, :job, :complete], measurements, metadata, _config) do
    :prometheus_histogram.observe(
      :job_duration_seconds,
      [metadata.queue],
      measurements.duration / 1_000_000_000
    )
  end

  def handle_event([:bullmq, :job, :fail], _measurements, metadata, _config) do
    :prometheus_counter.inc(:job_failures_total, [metadata.queue])
  end
end

Summary

Functions

Attaches a handler function to BullMQ events.

Attaches a handler to multiple BullMQ events.

Emits a telemetry event.

Spans a function call with telemetry events.

Types

event_name()

@type event_name() :: atom()

measurements()

@type measurements() :: map()

metadata()

@type metadata() :: map()

Functions

attach(handler_id, event_suffix, handler_fn, config \\ nil)

@spec attach(String.t(), [atom()], (list(), map(), map(), term() -> :ok), term()) ::
  :ok | {:error, term()}

Attaches a handler function to BullMQ events.

This is a convenience wrapper around :telemetry.attach/4.

Example

BullMQ.Telemetry.attach(
  "my-handler",
  [:job, :complete],
  fn _event, measurements, metadata, _config ->
    IO.puts("Job #{metadata.job_id} completed in #{measurements.duration}ns")
  end
)

attach_many(handler_id, event_suffixes, handler_fn, config \\ nil)

@spec attach_many(
  String.t(),
  [[atom()]],
  (list(), map(), map(), term() -> :ok),
  term()
) ::
  :ok | {:error, term()}

Attaches a handler to multiple BullMQ events.

Example

BullMQ.Telemetry.attach_many("my-handler", [
  [:job, :complete],
  [:job, :fail],
  [:worker, :start]
], &handle_event/4)

emit(event_name, measurements, metadata)

@spec emit(event_name() | [event_name()], measurements(), metadata()) :: :ok

Emits a telemetry event.

This is used internally by BullMQ. You typically don't need to call this directly.

span(event_suffix, metadata, fun)

@spec span([atom()], metadata(), (-> result)) :: result when result: term()

Spans a function call with telemetry events.

Emits start and stop/exception events around the function call.

Example

BullMQ.Telemetry.span([:job, :process], %{job_id: "123"}, fn ->
  do_work()
end)