View Source Datadog.DataStreams.Integrations.Kafka (Data Streams Ex v1.2.2)

Functions for integrating Kafka tracing with DataStreams.

Usage

Because Elixir does not include a context grab bag to pass around, we use the OpenTelemetry context to store the current DataStreams pathway. If you are not using OpenTelemetry, or have a special fan in or fan out situation, you can use the respective functions that take a pathway as an argument.

If you have a basic one message in one message out situation, and you have OpenTelemetry already covering your application, you can use the trace_produce/1 and trace_consume/2 functions.

require OpenTelemetry.Tracer, as: Tracer

alias Datadog.DataStreams.Integrations.Kafka, as: DataStreamsKafka

@doc "Handles a message from Kafka. Receives a message map with partition, topic, and headers."
@spec handle_message(map()) :: :ok
def handle_message(message) do
  # NOTE: This does not add the recommended Kafka span attributes.
  Tracer.with_span "#{message.topic} process" do
    DataStreamsKafka.trace_consume(message, "my_consumer_group")

    # Do work

    Tracer.with_span "#{new_message.topic} produce" do
      new_message
      |> DataStreamsKafka.trace_produce()
      |> send_to_kafka()
    end
  end
end

Summary

Types

A general map that contains the topic, partition, and headers atoms. This matches the format of Elsa.elsa_message by default (and will work out of the box), though will need the topic and partition added if you are using standard :brod (or kpro).

Functions

Traces a Kafka message being consumed. Requires the current Kafka consumer group. Uses the pathway in the current Datadog.DataStreams.Context.

Traces a Kafka message being consumed. Requires the current Kafka consumer group.

Traces a Kafka message being produced. Uses the pathway in the current Datadog.DataStreams.Context. Returns a new message with the pathway encoded in the header values.

Traces a Kafka message being produced. Returns the new message with the pathway encoded in the header values, as well as the new pathway.

Tracks Kafka produce events via their offset. This is used by Datadog to calculate the lag without requiring the consumer to be on and reading trace headers.

Tracks Kafka produce events via their offset. This is used by Datadog to calculate the lag without requiring the consumer to be on and reading trace headers.

Types

@type message() :: map()

A general map that contains the topic, partition, and headers atoms. This matches the format of Elsa.elsa_message by default (and will work out of the box), though will need the topic and partition added if you are using standard :brod (or kpro).

Functions

Link to this function

trace_consume(message, consumer_group)

View Source
@spec trace_consume(message(), String.t()) :: Datadog.DataStreams.Pathway.t()

Traces a Kafka message being consumed. Requires the current Kafka consumer group. Uses the pathway in the current Datadog.DataStreams.Context.

Link to this function

trace_consume_with_pathway(pathway, message, consumer_group)

View Source
@spec trace_consume_with_pathway(
  Datadog.DataStreams.Pathway.t() | nil,
  message(),
  String.t()
) ::
  Datadog.DataStreams.Pathway.t()

Traces a Kafka message being consumed. Requires the current Kafka consumer group.

Do not pass the resulting pathway from this function to another call of trace_consume_with_pathway, as it will modify the pathway incorrectly.

@spec trace_produce(msg) :: msg when msg: message()

Traces a Kafka message being produced. Uses the pathway in the current Datadog.DataStreams.Context. Returns a new message with the pathway encoded in the header values.

Link to this function

trace_produce_with_pathway(pathway, message)

View Source
@spec trace_produce_with_pathway(Datadog.DataStreams.Pathway.t() | nil, msg) ::
  {msg, Datadog.DataStreams.Pathway.t()}
when msg: message()

Traces a Kafka message being produced. Returns the new message with the pathway encoded in the header values, as well as the new pathway.

Link to this function

track_consume(group, topic, partition, offset)

View Source
@spec track_consume(String.t(), String.t(), non_neg_integer(), integer()) :: :ok

Tracks Kafka produce events via their offset. This is used by Datadog to calculate the lag without requiring the consumer to be on and reading trace headers.

Link to this function

track_produce(topic, partition, offset)

View Source
@spec track_produce(String.t(), non_neg_integer(), integer()) :: :ok

Tracks Kafka produce events via their offset. This is used by Datadog to calculate the lag without requiring the consumer to be on and reading trace headers.