BullMQ.Telemetry (BullMQ v1.0.1)
View SourceTelemetry integration for BullMQ.
BullMQ emits various telemetry events that you can attach handlers to for metrics, logging, and monitoring.
Event Names
All events are prefixed with [:bullmq, ...].
Job Events
[:bullmq, :job, :add]- A job was added to the queue- Measurements:
%{queue_time: native_time}(time to add to Redis) - Metadata:
%{queue: name, job_id: id, job_name: name}
- Measurements:
[:bullmq, :job, :start]- A job started processing- Measurements:
%{system_time: native_time} - Metadata:
%{queue: name, job_id: id, job_name: name, worker: pid}
- Measurements:
[:bullmq, :job, :complete]- A job completed successfully- Measurements:
%{duration: native_time} - Metadata:
%{queue: name, job_id: id, job_name: name, worker: pid}
- Measurements:
[:bullmq, :job, :fail]- A job failed- Measurements:
%{duration: native_time} - Metadata:
%{queue: name, job_id: id, job_name: name, worker: pid, error: term}
- Measurements:
[:bullmq, :job, :retry]- A job is being retried- Measurements:
%{attempt: integer, delay: ms} - Metadata:
%{queue: name, job_id: id, job_name: name}
- Measurements:
[:bullmq, :job, :progress]- Job progress updated- Measurements:
%{progress: 0..100} - Metadata:
%{queue: name, job_id: id}
- Measurements:
Worker Events
[:bullmq, :worker, :start]- Worker started- Measurements:
%{concurrency: integer} - Metadata:
%{queue: name, worker: pid}
- Measurements:
[:bullmq, :worker, :stop]- Worker stopped- Measurements:
%{uptime: native_time} - Metadata:
%{queue: name, worker: pid}
- Measurements:
[:bullmq, :worker, :stalled_check]- Stalled job check executed- Measurements:
%{recovered: integer, failed: integer} - Metadata:
%{queue: name}
- Measurements:
Queue Events
[:bullmq, :queue, :pause]- Queue was paused[:bullmq, :queue, :resume]- Queue was resumed[:bullmq, :queue, :drain]- Queue was drained
Rate Limiting Events
[:bullmq, :rate_limit, :hit]- Rate limit was hit- Measurements:
%{delay: ms} - Metadata:
%{queue: name}
- Measurements:
Example Setup
# In your application.ex
:telemetry.attach_many(
"bullmq-logger",
[
[:bullmq, :job, :complete],
[:bullmq, :job, :fail],
[:bullmq, :rate_limit, :hit]
],
&MyApp.Telemetry.handle_event/4,
nil
)Prometheus/StatsD Integration
defmodule MyApp.Telemetry do
def handle_event([:bullmq, :job, :complete], measurements, metadata, _config) do
:prometheus_histogram.observe(
:job_duration_seconds,
[metadata.queue],
measurements.duration / 1_000_000_000
)
end
def handle_event([:bullmq, :job, :fail], _measurements, metadata, _config) do
:prometheus_counter.inc(:job_failures_total, [metadata.queue])
end
end
Summary
Functions
Attaches a handler function to BullMQ events.
Attaches a handler to multiple BullMQ events.
Emits a telemetry event.
Spans a function call with telemetry events.
Types
Functions
@spec attach(String.t(), [atom()], (list(), map(), map(), term() -> :ok), term()) :: :ok | {:error, term()}
Attaches a handler function to BullMQ events.
This is a convenience wrapper around :telemetry.attach/4.
Example
BullMQ.Telemetry.attach(
"my-handler",
[:job, :complete],
fn _event, measurements, metadata, _config ->
IO.puts("Job #{metadata.job_id} completed in #{measurements.duration}ns")
end
)
@spec attach_many( String.t(), [[atom()]], (list(), map(), map(), term() -> :ok), term() ) :: :ok | {:error, term()}
Attaches a handler to multiple BullMQ events.
Example
BullMQ.Telemetry.attach_many("my-handler", [
[:job, :complete],
[:job, :fail],
[:worker, :start]
], &handle_event/4)
@spec emit(event_name() | [event_name()], measurements(), metadata()) :: :ok
Emits a telemetry event.
This is used internally by BullMQ. You typically don't need to call this directly.
Spans a function call with telemetry events.
Emits start and stop/exception events around the function call.
Example
BullMQ.Telemetry.span([:job, :process], %{job_id: "123"}, fn ->
do_work()
end)