BullMQ.Telemetry.OpenTelemetry (BullMQ v1.3.2)

View Source

OpenTelemetry adapter for BullMQ distributed tracing.

This module implements the BullMQ.Telemetry.Behaviour using OpenTelemetry APIs. It provides distributed tracing capabilities that allow you to:

  • Trace jobs across queue boundaries
  • Link spans from job producers to job consumers
  • Propagate trace context through Redis
  • Integrate with OpenTelemetry backends (Jaeger, Zipkin, Honeycomb, etc.)

Prerequisites

This module requires the opentelemetry_api package. Add it to your dependencies:

# mix.exs
defp deps do
  [
    {:bullmq, "~> 0.1"},
    {:opentelemetry_api, "~> 1.0"},
    {:opentelemetry, "~> 1.0"},
    # Choose your exporter
    {:opentelemetry_exporter, "~> 1.0"}
  ]
end

Configuration

Configure OpenTelemetry in your application:

# config/config.exs
config :opentelemetry,
  span_processor: :batch,
  traces_exporter: :otlp

config :opentelemetry_exporter,
  otlp_protocol: :grpc,
  otlp_endpoint: "http://localhost:4317"

Usage

# When adding jobs, trace context is automatically propagated
{:ok, queue} = BullMQ.Queue.start_link(
  name: :my_queue,
  connection: :redis,
  telemetry: BullMQ.Telemetry.OpenTelemetry
)

# Create a parent span and add jobs within it
require OpenTelemetry.Tracer, as: Tracer
Tracer.with_span "my_operation" do
  # Trace context from "my_operation" is propagated to the job
  {:ok, job} = BullMQ.Queue.add(queue, "email", %{to: "user@example.com"})
end

# When processing, the worker restores the trace context
{:ok, worker} = BullMQ.Worker.start_link(
  name: :my_worker,
  queue: "my_queue",
  connection: :redis,
  telemetry: BullMQ.Telemetry.OpenTelemetry,
  processor: fn job, _token ->
    # This span is linked to the original "my_operation" span
    Tracer.with_span "process_email" do
      send_email(job.data)
    end
  end
)

Span Naming Convention

BullMQ creates spans with the following names:

  • bullmq.queue.add - When adding a single job
  • bullmq.queue.add_bulk - When adding multiple jobs
  • bullmq.worker.process - When processing a job

Span Attributes

Spans include the following semantic attributes:

  • messaging.system - Always "bullmq"
  • messaging.destination.name - The queue name
  • messaging.message.id - The job ID
  • messaging.operation - The operation type ("publish" or "receive")
  • bullmq.job.name - The job name/type
  • bullmq.job.priority - The job priority
  • bullmq.job.delay - The job delay in ms
  • bullmq.job.attempts - Number of attempts made

Summary

Functions

Checks if OpenTelemetry is available and properly configured.

Traces a complete operation with automatic span lifecycle management.

Functions

available?()

@spec available?() :: boolean()

Checks if OpenTelemetry is available and properly configured.

Returns true if the required OpenTelemetry modules are loaded. Note: This only checks if the modules are available. For the SDK to actually record spans, you need to have opentelemetry (the SDK) installed and configured, not just opentelemetry_api.

trace(name, opts \\ [], fun)

@spec trace(
  String.t(),
  keyword(),
  (term() -> result) | (term(), String.t() | nil -> result)
) :: result
when result: term()

Traces a complete operation with automatic span lifecycle management.

This is a convenience function that starts a span, runs the given function, and ends the span with the appropriate status.

Arguments

  • name - The span name
  • opts - Span options (see start_span/2)
  • fun - The function to trace. Receives the span and optional propagation metadata.

Options

  • :propagate - If true, the function receives a second argument with the serialized trace context for propagation

Examples

# Simple trace
BullMQ.Telemetry.OpenTelemetry.trace("my.operation", [], fn _span ->
  do_work()
end)

# Trace with context propagation (for queue.add)
BullMQ.Telemetry.OpenTelemetry.trace("queue.add", [propagate: true], fn _span, metadata ->
  # metadata contains serialized trace context to store in job
  {:ok, job} = add_job_with_metadata(metadata)
end)