BullMQ.Telemetry.OpenTelemetry (BullMQ v1.3.2)
View SourceOpenTelemetry adapter for BullMQ distributed tracing.
This module implements the BullMQ.Telemetry.Behaviour using OpenTelemetry APIs.
It provides distributed tracing capabilities that allow you to:
- Trace jobs across queue boundaries
- Link spans from job producers to job consumers
- Propagate trace context through Redis
- Integrate with OpenTelemetry backends (Jaeger, Zipkin, Honeycomb, etc.)
Prerequisites
This module requires the opentelemetry_api package. Add it to your dependencies:
# mix.exs
defp deps do
[
{:bullmq, "~> 0.1"},
{:opentelemetry_api, "~> 1.0"},
{:opentelemetry, "~> 1.0"},
# Choose your exporter
{:opentelemetry_exporter, "~> 1.0"}
]
endConfiguration
Configure OpenTelemetry in your application:
# config/config.exs
config :opentelemetry,
span_processor: :batch,
traces_exporter: :otlp
config :opentelemetry_exporter,
otlp_protocol: :grpc,
otlp_endpoint: "http://localhost:4317"Usage
# When adding jobs, trace context is automatically propagated
{:ok, queue} = BullMQ.Queue.start_link(
name: :my_queue,
connection: :redis,
telemetry: BullMQ.Telemetry.OpenTelemetry
)
# Create a parent span and add jobs within it
require OpenTelemetry.Tracer, as: Tracer
Tracer.with_span "my_operation" do
# Trace context from "my_operation" is propagated to the job
{:ok, job} = BullMQ.Queue.add(queue, "email", %{to: "user@example.com"})
end
# When processing, the worker restores the trace context
{:ok, worker} = BullMQ.Worker.start_link(
name: :my_worker,
queue: "my_queue",
connection: :redis,
telemetry: BullMQ.Telemetry.OpenTelemetry,
processor: fn job, _token ->
# This span is linked to the original "my_operation" span
Tracer.with_span "process_email" do
send_email(job.data)
end
end
)Span Naming Convention
BullMQ creates spans with the following names:
bullmq.queue.add- When adding a single jobbullmq.queue.add_bulk- When adding multiple jobsbullmq.worker.process- When processing a job
Span Attributes
Spans include the following semantic attributes:
messaging.system- Always "bullmq"messaging.destination.name- The queue namemessaging.message.id- The job IDmessaging.operation- The operation type ("publish" or "receive")bullmq.job.name- The job name/typebullmq.job.priority- The job prioritybullmq.job.delay- The job delay in msbullmq.job.attempts- Number of attempts made
Summary
Functions
Checks if OpenTelemetry is available and properly configured.
Traces a complete operation with automatic span lifecycle management.
Functions
@spec available?() :: boolean()
Checks if OpenTelemetry is available and properly configured.
Returns true if the required OpenTelemetry modules are loaded.
Note: This only checks if the modules are available. For the SDK to
actually record spans, you need to have opentelemetry (the SDK)
installed and configured, not just opentelemetry_api.
@spec trace( String.t(), keyword(), (term() -> result) | (term(), String.t() | nil -> result) ) :: result when result: term()
Traces a complete operation with automatic span lifecycle management.
This is a convenience function that starts a span, runs the given function, and ends the span with the appropriate status.
Arguments
name- The span nameopts- Span options (seestart_span/2)fun- The function to trace. Receives the span and optional propagation metadata.
Options
:propagate- Iftrue, the function receives a second argument with the serialized trace context for propagation
Examples
# Simple trace
BullMQ.Telemetry.OpenTelemetry.trace("my.operation", [], fn _span ->
do_work()
end)
# Trace with context propagation (for queue.add)
BullMQ.Telemetry.OpenTelemetry.trace("queue.add", [propagate: true], fn _span, metadata ->
# metadata contains serialized trace context to store in job
{:ok, job} = add_job_with_metadata(metadata)
end)