View Source Datadog.DataStreams.Integrations.Kafka (Data Streams Ex v1.1.2)

Functions for integrating Kafka tracing with DataStreams.

usage

Usage

Because Elixir does not include a context grab bag to pass around, we use the OpenTelemetry context to store the current DataStreams pathway. If you are not using OpenTelemetry, or have a special fan in or fan out situation, you can use the respective functions that take a pathway as an argument.

If you have a basic one message in one message out situation, and you have OpenTelemetry already covering your application, you can use the trace_produce/1 and trace_consume/2 functions.

require OpenTelemetry.Tracer, as: Tracer

alias Datadog.DataStreams.Integrations.Kafka, as: DataStreamsKafka

@doc "Handles a message from Kafka. Receives a message map with partition, topic, and headers."
@spec handle_message(map()) :: :ok
def handle_message(message) do
  # NOTE: This does not add the recommended Kafka span attributes.
  Tracer.with_span "#{message.topic} process" do
    DataStreamsKafka.trace_consume(message, "my_consumer_group")

    # Do work

    Tracer.with_span "#{new_message.topic} produce" do
      new_message
      |> DataStreamsKafka.trace_produce()
      |> send_to_kafka()
    end
  end
end

Link to this section Summary

Types

A general map that contains the topic, partition, and headers atoms. This matches the format of Elsa.elsa_message by default (and will work out of the box), though will need the topic and partition added if you are using standard :brod (or kpro).

Functions

Traces a Kafka message being consumed. Requires the current Kafka consumer group. Uses the pathway in the current Datadog.DataStreams.Context.

Traces a Kafka message being consumed. Requires the current Kafka consumer group.

Traces a Kafka message being produced. Uses the pathway in the current Datadog.DataStreams.Context. Returns a new message with the pathway encoded in the header values.

Traces a Kafka message being produced. Returns the new message with the pathway encoded in the header values, as well as the new pathway.

Link to this section Types

@type message() :: map()

A general map that contains the topic, partition, and headers atoms. This matches the format of Elsa.elsa_message by default (and will work out of the box), though will need the topic and partition added if you are using standard :brod (or kpro).

Link to this section Functions

Link to this function

trace_consume(message, consumer_group)

View Source
@spec trace_consume(message(), String.t()) :: Datadog.DataStreams.Pathway.t()

Traces a Kafka message being consumed. Requires the current Kafka consumer group. Uses the pathway in the current Datadog.DataStreams.Context.

Link to this function

trace_consume_with_pathway(pathway, message, consumer_group)

View Source
@spec trace_consume_with_pathway(
  Datadog.DataStreams.Pathway.t() | nil,
  message(),
  String.t()
) ::
  Datadog.DataStreams.Pathway.t()

Traces a Kafka message being consumed. Requires the current Kafka consumer group.

Do not pass the resulting pathway from this function to another call of trace_consume_with_pathway, as it will modify the pathway incorrectly.

@spec trace_produce(msg) :: msg when msg: message()

Traces a Kafka message being produced. Uses the pathway in the current Datadog.DataStreams.Context. Returns a new message with the pathway encoded in the header values.

Link to this function

trace_produce_with_pathway(pathway, message)

View Source
@spec trace_produce_with_pathway(Datadog.DataStreams.Pathway.t() | nil, msg) ::
  {msg, Datadog.DataStreams.Pathway.t()}
when msg: message()

Traces a Kafka message being produced. Returns the new message with the pathway encoded in the header values, as well as the new pathway.