View Source Datadog.DataStreams.Integrations.Kafka (Data Streams Ex v1.2.2)
Functions for integrating Kafka tracing with DataStreams.
Usage
Because Elixir does not include a context
grab bag to pass
around, we use the OpenTelemetry context to store the current
DataStreams pathway. If you are not using OpenTelemetry, or
have a special fan in or fan out situation, you can use the
respective functions that take a pathway as an argument.
If you have a basic one message in one message out situation,
and you have OpenTelemetry already covering your application,
you can use the trace_produce/1
and trace_consume/2
functions.
require OpenTelemetry.Tracer, as: Tracer
alias Datadog.DataStreams.Integrations.Kafka, as: DataStreamsKafka
@doc "Handles a message from Kafka. Receives a message map with partition, topic, and headers."
@spec handle_message(map()) :: :ok
def handle_message(message) do
# NOTE: This does not add the recommended Kafka span attributes.
Tracer.with_span "#{message.topic} process" do
DataStreamsKafka.trace_consume(message, "my_consumer_group")
# Do work
Tracer.with_span "#{new_message.topic} produce" do
new_message
|> DataStreamsKafka.trace_produce()
|> send_to_kafka()
end
end
end
Summary
Types
A general map that contains the topic, partition, and headers atoms. This
matches the format of Elsa.elsa_message
by default
(and will work out of the box), though will need the topic and partition
added if you are using standard :brod
(or kpro
).
Functions
Traces a Kafka message being consumed. Requires the current Kafka
consumer group. Uses the pathway in the current
Datadog.DataStreams.Context
.
Traces a Kafka message being consumed. Requires the current Kafka consumer group.
Traces a Kafka message being produced. Uses the pathway in the
current Datadog.DataStreams.Context
. Returns a new message with
the pathway encoded in the header values.
Traces a Kafka message being produced. Returns the new message with the pathway encoded in the header values, as well as the new pathway.
Tracks Kafka produce events via their offset. This is used by Datadog to calculate the lag without requiring the consumer to be on and reading trace headers.
Tracks Kafka produce events via their offset. This is used by Datadog to calculate the lag without requiring the consumer to be on and reading trace headers.
Types
@type message() :: map()
A general map that contains the topic, partition, and headers atoms. This
matches the format of Elsa.elsa_message
by default
(and will work out of the box), though will need the topic and partition
added if you are using standard :brod
(or kpro
).
Functions
@spec trace_consume(message(), String.t()) :: Datadog.DataStreams.Pathway.t()
Traces a Kafka message being consumed. Requires the current Kafka
consumer group. Uses the pathway in the current
Datadog.DataStreams.Context
.
@spec trace_consume_with_pathway( Datadog.DataStreams.Pathway.t() | nil, message(), String.t() ) :: Datadog.DataStreams.Pathway.t()
Traces a Kafka message being consumed. Requires the current Kafka consumer group.
Do not pass the resulting pathway from this function to another call
of trace_consume_with_pathway
, as it will modify the pathway incorrectly.
@spec trace_produce(msg) :: msg when msg: message()
Traces a Kafka message being produced. Uses the pathway in the
current Datadog.DataStreams.Context
. Returns a new message with
the pathway encoded in the header values.
@spec trace_produce_with_pathway(Datadog.DataStreams.Pathway.t() | nil, msg) :: {msg, Datadog.DataStreams.Pathway.t()} when msg: message()
Traces a Kafka message being produced. Returns the new message with the pathway encoded in the header values, as well as the new pathway.
@spec track_consume(String.t(), String.t(), non_neg_integer(), integer()) :: :ok
Tracks Kafka produce events via their offset. This is used by Datadog to calculate the lag without requiring the consumer to be on and reading trace headers.
@spec track_produce(String.t(), non_neg_integer(), integer()) :: :ok
Tracks Kafka produce events via their offset. This is used by Datadog to calculate the lag without requiring the consumer to be on and reading trace headers.