View Source KafkaEx.Telemetry (kafka_ex v1.0.0-rc.1)
Telemetry events emitted by KafkaEx.
KafkaEx uses the :telemetry library to emit events that can be used
for observability. This module documents all events and provides
helper functions.
Attaching Handlers
To receive telemetry events, attach a handler:
:telemetry.attach("my-handler", [:kafka_ex, :request, :stop], &MyModule.handle_event/4, nil)Or attach to multiple events:
:telemetry.attach_many("my-handler", KafkaEx.Telemetry.events(), &MyModule.handle_event/4, nil)
Events
Request Events
Low-level Kafka protocol request/response events.
[:kafka_ex, :request, :start]- Emitted when a Kafka protocol request begins- Measurements:
%{system_time: integer()} - Metadata:
%{operation: atom(), api_version: integer(), correlation_id: integer(), client_id: binary(), broker: map()} - Note:
brokeris empty%{}at start (actual broker determined after send)
- Measurements:
[:kafka_ex, :request, :stop]- Emitted when a Kafka protocol request completes- Measurements:
%{duration: integer()} - Metadata: Same as start event plus
%{bytes_sent: integer(), bytes_received: integer(), broker: map()}bytes_sent- Number of bytes sent in the requestbytes_received- Number of bytes received in the response (0 on error)broker- Map withnode_id,host,portof the broker that handled the request
- Measurements:
[:kafka_ex, :request, :exception]- Emitted when a Kafka protocol request fails- Measurements:
%{duration: integer()} - Metadata: Start metadata plus
%{kind: atom(), reason: term(), stacktrace: list()}
- Measurements:
Connection Events
[:kafka_ex, :connection, :start]- Emitted when connecting to a broker- Measurements:
%{system_time: integer()} - Metadata:
%{host: binary(), port: integer(), ssl: boolean()}
- Measurements:
[:kafka_ex, :connection, :stop]- Emitted when connection is established- Measurements:
%{duration: integer()} - Metadata: Same as start event plus
%{success: boolean()}
- Measurements:
[:kafka_ex, :connection, :exception]- Emitted when connection fails with exception- Measurements:
%{duration: integer()} - Metadata: Start metadata plus
%{kind: atom(), reason: term(), stacktrace: list()}
- Measurements:
[:kafka_ex, :connection, :close]- Emitted when a connection is closed- Measurements:
%{count: 1} - Metadata:
%{host: binary(), port: integer(), reason: atom()}reason- Why the connection closed::remote_closed- Remote broker closed the connection:timeout- Request timed out:send_error- Error sending data:recv_error- Error receiving data:shutdown- Client shutdown:init_error- Error during client initialization:metadata_update- Broker removed due to cluster topology change:reconnecting- Socket closed before reconnection attempt
- Measurements:
Authentication Events
[:kafka_ex, :auth, :start]- Emitted when SASL authentication begins- Measurements:
%{system_time: integer()} - Metadata:
%{host: binary(), port: integer(), mechanism: binary()}mechanism- The SASL mechanism name ("PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512", "OAUTHBEARER")
- Measurements:
[:kafka_ex, :auth, :stop]- Emitted when SASL authentication completes- Measurements:
%{duration: integer()} Metadata: Same as start event plus
%{result: :ok | :error, error: term()}result-:okon success,:erroron failureerror- Error reason (only on failure)
- Measurements:
[:kafka_ex, :auth, :exception]- Emitted when SASL authentication fails with exception- Measurements:
%{duration: integer()} - Metadata: Start metadata plus
%{kind: atom(), reason: term(), stacktrace: list()}
- Measurements:
Produce Events
[:kafka_ex, :produce, :start]- Emitted when a produce operation begins- Measurements:
%{system_time: integer(), message_count: integer()} - Metadata:
%{topic: binary(), partition: integer(), client_id: binary(), required_acks: integer()}
- Measurements:
[:kafka_ex, :produce, :stop]- Emitted when a produce operation completes- Measurements:
%{duration: integer()} Metadata: Same as start event plus
%{result: :ok | :error, offset: integer(), error: term()}result-:okon success,:erroron failureoffset- Base offset (only on success when acks > 0)error- Error reason (only on failure)
- Measurements:
[:kafka_ex, :produce, :exception]- Emitted when a produce operation fails- Measurements:
%{duration: integer()} - Metadata: Start metadata plus
%{kind: atom(), reason: term(), stacktrace: list()}
- Measurements:
Fetch Events
[:kafka_ex, :fetch, :start]- Emitted when a fetch operation begins- Measurements:
%{system_time: integer()} - Metadata:
%{topic: binary(), partition: integer(), offset: integer(), client_id: binary()}
- Measurements:
[:kafka_ex, :fetch, :stop]- Emitted when a fetch operation completes- Measurements:
%{duration: integer()} Metadata: Same as start event plus
%{result: :ok | :error, message_count: integer(), error: term()}result-:okon success,:erroron failuremessage_count- Number of messages fetched (only on success)error- Error reason (only on failure)
- Measurements:
[:kafka_ex, :fetch, :exception]- Emitted when a fetch operation fails- Measurements:
%{duration: integer()} - Metadata: Start metadata plus
%{kind: atom(), reason: term(), stacktrace: list()}
- Measurements:
Consumer Events
[:kafka_ex, :consumer, :commit, :start]- Emitted when an offset commit begins- Measurements:
%{system_time: integer()} - Metadata:
%{group_id: binary(), client_id: binary(), topic: binary(), partition_count: integer()}
- Measurements:
[:kafka_ex, :consumer, :commit, :stop]- Emitted when an offset commit completes- Measurements:
%{duration: integer()} Metadata: Same as start event plus
%{result: :ok | :error, error: term()}result-:okon success,:erroron failureerror- Error reason (only on failure)
- Measurements:
[:kafka_ex, :consumer, :commit, :exception]- Emitted when an offset commit fails- Measurements:
%{duration: integer()} - Metadata: Start metadata plus
%{kind: atom(), reason: term(), stacktrace: list()}
- Measurements:
Consumer Group Events
[:kafka_ex, :consumer, :join, :start]- Emitted when joining a consumer group- Measurements:
%{system_time: integer()} - Metadata:
%{group_id: binary(), member_id: binary(), topics: [binary()]}
- Measurements:
[:kafka_ex, :consumer, :join, :stop]- Emitted when join group completes- Measurements:
%{duration: integer()} Metadata: Same as start event plus
%{result: :ok | :error, generation_id: integer(), is_leader: boolean(), error: term()}
- Measurements:
[:kafka_ex, :consumer, :join, :exception]- Emitted when join group fails with exception- Measurements:
%{duration: integer()} - Metadata: Start metadata plus
%{kind: atom(), reason: term(), stacktrace: list()}
- Measurements:
[:kafka_ex, :consumer, :sync, :start]- Emitted when syncing consumer group state- Measurements:
%{system_time: integer()} - Metadata:
%{group_id: binary(), member_id: binary(), generation_id: integer(), is_leader: boolean()}
- Measurements:
[:kafka_ex, :consumer, :sync, :stop]- Emitted when sync group completes- Measurements:
%{duration: integer()} Metadata: Same as start event plus
%{result: :ok | :error, assigned_partitions: integer(), error: term()}
- Measurements:
[:kafka_ex, :consumer, :sync, :exception]- Emitted when sync group fails with exception- Measurements:
%{duration: integer()} - Metadata: Start metadata plus
%{kind: atom(), reason: term(), stacktrace: list()}
- Measurements:
[:kafka_ex, :consumer, :heartbeat, :start]- Emitted when sending a heartbeat- Measurements:
%{system_time: integer()} - Metadata:
%{group_id: binary(), member_id: binary(), generation_id: integer()}
- Measurements:
[:kafka_ex, :consumer, :heartbeat, :stop]- Emitted when heartbeat completes- Measurements:
%{duration: integer()} Metadata: Same as start event plus
%{result: :ok | :error, error: term()}
- Measurements:
[:kafka_ex, :consumer, :heartbeat, :exception]- Emitted when heartbeat fails with exception- Measurements:
%{duration: integer()} - Metadata: Start metadata plus
%{kind: atom(), reason: term(), stacktrace: list()}
- Measurements:
[:kafka_ex, :consumer, :leave, :start]- Emitted when leaving a consumer group- Measurements:
%{system_time: integer()} - Metadata:
%{group_id: binary(), member_id: binary()}
- Measurements:
[:kafka_ex, :consumer, :leave, :stop]- Emitted when leave group completes- Measurements:
%{duration: integer()} Metadata: Same as start event plus
%{result: :ok | :error, error: term()}
- Measurements:
[:kafka_ex, :consumer, :leave, :exception]- Emitted when leave group fails with exception- Measurements:
%{duration: integer()} - Metadata: Start metadata plus
%{kind: atom(), reason: term(), stacktrace: list()}
- Measurements:
[:kafka_ex, :consumer, :rebalance]- Emitted when a consumer group rebalance is triggered- Measurements:
%{count: 1} - Metadata:
%{group_id: binary(), member_id: binary(), generation_id: integer(), reason: atom()}
- Measurements:
Metadata Events
[:kafka_ex, :metadata, :update, :start]- Emitted when a metadata request begins- Measurements:
%{system_time: integer()} - Metadata:
%{client_id: binary(), topics: [binary()]}
- Measurements:
[:kafka_ex, :metadata, :update, :stop]- Emitted when a metadata request completes- Measurements:
%{duration: integer()} Metadata: Same as start event plus
%{result: :ok | :error, broker_count: integer(), topic_count: integer(), error: term()}result-:okon success,:erroron failurebroker_count- Number of brokers in the cluster (only on success)topic_count- Number of topics in the response (only on success)error- Error reason (only on failure)
- Measurements:
[:kafka_ex, :metadata, :update, :exception]- Emitted when a metadata request fails with exception- Measurements:
%{duration: integer()} - Metadata: Start metadata plus
%{kind: atom(), reason: term(), stacktrace: list()}
- Measurements:
Consumer Processing Events
[:kafka_ex, :consumer, :process, :start]- Emitted when GenConsumer begins processing a message batch- Measurements:
%{system_time: integer(), message_count: integer()} - Metadata:
%{group_id: binary(), topic: binary(), partition: integer(), consumer_module: binary()}
- Measurements:
[:kafka_ex, :consumer, :process, :stop]- Emitted when GenConsumer finishes processing a message batch- Measurements:
%{duration: integer()} Metadata: Same as start event plus
%{commit_mode: :async_commit | :sync_commit}commit_mode- The commit mode returned by the consumer callback
- Measurements:
[:kafka_ex, :consumer, :process, :exception]- Emitted when GenConsumer message processing fails- Measurements:
%{duration: integer()} - Metadata: Start metadata plus
%{kind: atom(), reason: term(), stacktrace: list()}
- Measurements:
Summary
Functions
Returns authentication-related telemetry events.
Returns connection-related telemetry events.
Returns consumer-related telemetry events (includes commit and group events).
Returns consumer group lifecycle events (join, sync, heartbeat, leave, rebalance).
Returns consumer message processing telemetry events.
Returns the list of all telemetry events emitted by KafkaEx.
Returns fetch-related telemetry events.
Returns metadata update-related telemetry events.
Returns produce-related telemetry events.
Returns request-related telemetry events.
Functions
@spec auth_events() :: [[atom()]]
Returns authentication-related telemetry events.
@spec connection_events() :: [[atom()]]
Returns connection-related telemetry events.
@spec consumer_events() :: [[atom()]]
Returns consumer-related telemetry events (includes commit and group events).
@spec consumer_group_events() :: [[atom()]]
Returns consumer group lifecycle events (join, sync, heartbeat, leave, rebalance).
@spec consumer_process_events() :: [[atom()]]
Returns consumer message processing telemetry events.
@spec events() :: [[atom()]]
Returns the list of all telemetry events emitted by KafkaEx.
@spec fetch_events() :: [[atom()]]
Returns fetch-related telemetry events.
@spec metadata_events() :: [[atom()]]
Returns metadata update-related telemetry events.
@spec produce_events() :: [[atom()]]
Returns produce-related telemetry events.
@spec request_events() :: [[atom()]]
Returns request-related telemetry events.