BullMQ.Telemetry.Behaviour behaviour (BullMQ v1.3.2)
View SourceBehaviour for telemetry integrations in BullMQ.
This behaviour allows implementing distributed tracing with OpenTelemetry or other tracing systems. It provides callbacks for:
- Starting and ending spans
- Propagating trace context across job boundaries
- Running code within a traced context
Implementing Custom Telemetry
To implement your own telemetry integration:
defmodule MyApp.CustomTelemetry do
@behaviour BullMQ.Telemetry.Behaviour
@impl true
def start_span(name, opts) do
# Create and return a span
%{span_id: UUID.uuid4(), name: name}
end
@impl true
def end_span(span, status) do
# End the span and record status
:ok
end
@impl true
def get_current_context() do
# Return current trace context
Process.get(:trace_context, nil)
end
@impl true
def serialize_context(context) do
# Convert context to string for Redis storage
Jason.encode!(context)
end
@impl true
def deserialize_context(metadata) do
# Restore context from Redis
Jason.decode!(metadata)
end
@impl true
def with_context(context, fun) do
# Run function within the given context
Process.put(:trace_context, context)
try do
fun.()
after
Process.delete(:trace_context)
end
end
@impl true
def set_attribute(span, key, value) do
# Add attribute to span
Map.put(span, key, value)
end
@impl true
def add_event(span, name, attributes) do
# Add event to span
:ok
end
@impl true
def record_exception(span, exception, stacktrace) do
# Record exception on span
:ok
end
endUsage with Queue and Worker
# When adding jobs
{:ok, queue} = BullMQ.Queue.start_link(
name: :my_queue,
connection: :redis,
telemetry: MyApp.CustomTelemetry
)
# Trace context is automatically propagated
{:ok, job} = BullMQ.Queue.add(queue, "email", %{to: "user@example.com"})
# When processing jobs, context is restored
{:ok, worker} = BullMQ.Worker.start_link(
name: :my_worker,
queue: "my_queue",
connection: :redis,
telemetry: MyApp.CustomTelemetry,
processor: fn job, _token ->
# This runs within the restored trace context
do_work(job)
end
)
Summary
Types
The trace context that can be propagated across process/service boundaries
A span representing a unit of work in the trace
Span kind indicating the role of the span
Options for starting a span
Span status indicating the outcome
Callbacks
Adds an event to a span.
Deserializes a trace context from a string.
Ends a span.
Gets the current trace context.
Records an exception on a span.
Serializes a trace context to a string.
Sets an attribute on a span.
Starts a new span.
Runs a function within a given trace context.
Types
@type context() :: term()
The trace context that can be propagated across process/service boundaries
@type span() :: term()
A span representing a unit of work in the trace
@type span_kind() :: :internal | :server | :client | :producer | :consumer
Span kind indicating the role of the span
Options for starting a span
@type span_status() :: :ok | :error | {:error, String.t()}
Span status indicating the outcome
Callbacks
Adds an event to a span.
Events are timestamped annotations that represent something happening during the span's lifetime.
Arguments
span- The span to add the event toname- The event nameattributes- Optional attributes for the event (default: %{})
Deserializes a trace context from a string.
Restores a context from its serialized string representation.
Arguments
metadata- The serialized context string
Returns
The restored context.
@callback end_span(span :: span(), status :: span_status()) :: :ok
Ends a span.
Marks the span as complete and records the final status.
Arguments
span- The span to endstatus- The status of the span (:okor{:error, reason})
@callback get_current_context() :: context() | nil
Gets the current trace context.
Returns the active trace context, if any. This context can be serialized and passed to other processes or services.
Returns
The current context, or nil if no context is active.
@callback record_exception( span :: span(), exception :: Exception.t(), stacktrace :: list() ) :: :ok
Records an exception on a span.
Marks the span with exception information. This is typically called when an error occurs during the span's execution.
Arguments
span- The span to record the exception onexception- The exception that occurredstacktrace- The stacktrace at the time of the exception
Serializes a trace context to a string.
Converts the context to a string format suitable for storage in Redis or transmission over the network. The serialization should follow W3C Trace Context format for interoperability.
Arguments
context- The context to serialize
Returns
A string representation of the context.
Sets an attribute on a span.
Adds a key-value attribute to the span. Attributes are used to annotate spans with additional information.
Arguments
span- The span to add the attribute tokey- The attribute key (string or atom)value- The attribute value
Starts a new span.
Creates a new span with the given name and options. The span should be
ended by calling end_span/2.
Arguments
name- The name of the span (e.g., "queue.add", "worker.process")opts- Options for the span::kind- The span kind (:producer,:consumer,:internal, etc.):attributes- Initial attributes for the span:parent- Parent context for linking spans
Returns
A span that can be passed to other functions like end_span/2,
set_attribute/3, etc.
Runs a function within a given trace context.
Sets the given context as the active context for the duration of the function call. This is used to restore context when processing a job that was added with a trace context.
Arguments
context- The context to activatefun- The function to run within the context
Returns
The return value of the function.