View Source Datadog.DataStreams.Integrations.Kafka (Data Streams Ex v1.1.2)
Functions for integrating Kafka tracing with DataStreams.
usage
Usage
Because Elixir does not include a context
grab bag to pass
around, we use the OpenTelemetry context to store the current
DataStreams pathway. If you are not using OpenTelemetry, or
have a special fan in or fan out situation, you can use the
respective functions that take a pathway as an argument.
If you have a basic one message in one message out situation,
and you have OpenTelemetry already covering your application,
you can use the trace_produce/1
and trace_consume/2
functions.
require OpenTelemetry.Tracer, as: Tracer
alias Datadog.DataStreams.Integrations.Kafka, as: DataStreamsKafka
@doc "Handles a message from Kafka. Receives a message map with partition, topic, and headers."
@spec handle_message(map()) :: :ok
def handle_message(message) do
# NOTE: This does not add the recommended Kafka span attributes.
Tracer.with_span "#{message.topic} process" do
DataStreamsKafka.trace_consume(message, "my_consumer_group")
# Do work
Tracer.with_span "#{new_message.topic} produce" do
new_message
|> DataStreamsKafka.trace_produce()
|> send_to_kafka()
end
end
end
Link to this section Summary
Types
A general map that contains the topic, partition, and headers atoms. This
matches the format of Elsa.elsa_message
by default
(and will work out of the box), though will need the topic and partition
added if you are using standard :brod
(or kpro
).
Functions
Traces a Kafka message being consumed. Requires the current Kafka
consumer group. Uses the pathway in the current
Datadog.DataStreams.Context
.
Traces a Kafka message being consumed. Requires the current Kafka consumer group.
Traces a Kafka message being produced. Uses the pathway in the
current Datadog.DataStreams.Context
. Returns a new message with
the pathway encoded in the header values.
Traces a Kafka message being produced. Returns the new message with the pathway encoded in the header values, as well as the new pathway.
Link to this section Types
@type message() :: map()
A general map that contains the topic, partition, and headers atoms. This
matches the format of Elsa.elsa_message
by default
(and will work out of the box), though will need the topic and partition
added if you are using standard :brod
(or kpro
).
Link to this section Functions
@spec trace_consume(message(), String.t()) :: Datadog.DataStreams.Pathway.t()
Traces a Kafka message being consumed. Requires the current Kafka
consumer group. Uses the pathway in the current
Datadog.DataStreams.Context
.
@spec trace_consume_with_pathway( Datadog.DataStreams.Pathway.t() | nil, message(), String.t() ) :: Datadog.DataStreams.Pathway.t()
Traces a Kafka message being consumed. Requires the current Kafka consumer group.
Do not pass the resulting pathway from this function to another call
of trace_consume_with_pathway
, as it will modify the pathway incorrectly.
@spec trace_produce(msg) :: msg when msg: message()
Traces a Kafka message being produced. Uses the pathway in the
current Datadog.DataStreams.Context
. Returns a new message with
the pathway encoded in the header values.
@spec trace_produce_with_pathway(Datadog.DataStreams.Pathway.t() | nil, msg) :: {msg, Datadog.DataStreams.Pathway.t()} when msg: message()
Traces a Kafka message being produced. Returns the new message with the pathway encoded in the header values, as well as the new pathway.