View Source KafkaEx.API (kafka_ex v1.0.0-rc.1)
The primary Kafka client API for KafkaEx v1.0+
This module provides two ways to interact with Kafka:
Direct API Usage
All functions take a client pid as the first argument:
{:ok, client} = KafkaEx.API.start_client()
# Produce a message
{:ok, metadata} = KafkaEx.API.produce(client, "my-topic", 0, [%{value: "hello"}])
# Fetch messages
{:ok, fetch_result} = KafkaEx.API.fetch(client, "my-topic", 0, 0)
Using via use KafkaEx.API
You can also use this module in your own module to get API functions
that automatically use a configured client:
defmodule MyApp.Kafka do
use KafkaEx.API
# Override client/0 to return your client pid
def client do
MyApp.KafkaClient # a named client
end
end
# Now call without passing client:
{:ok, metadata} = MyApp.Kafka.produce("my-topic", 0, [%{value: "hello"}])
Options for use
:client- A module, atom, or function that returns the client pid. If not provided, you must implementclient/0in your module.
Example with static client name:
defmodule MyApp.Kafka do
use KafkaEx.API, client: MyApp.KafkaClient
endExample with dynamic client lookup:
defmodule MyApp.Kafka do
use KafkaEx.API
def client do
Process.whereis(:my_kafka_client) || raise "Client not started"
end
end
Summary
Functions
Fetch API versions supported by the Kafka broker.
Returns a child specification for starting a client under a supervisor.
Returns the cluster metadata from the given client.
Commit offsets for a consumer group.
Returns the current correlation id for the given client.
Creates a single topic with the given configuration.
Creates one or more topics in the Kafka cluster.
Deletes a single topic from the Kafka cluster.
Deletes topics from the Kafka cluster.
Sends a request to describe a group identified by its name.
Fetch the earliest offset for a given partition.
Fetch records from a Kafka topic partition.
Fetch all available records from a topic partition.
Fetch committed offsets for a consumer group.
Find the coordinator broker for a consumer group or transaction.
Send a heartbeat to a consumer group coordinator.
Join a consumer group.
Fetch the latest offset for a given partition.
Leave a consumer group.
Returns list of Offsets per topic per partition.
Fetch metadata from Kafka brokers.
Fetch metadata for specific topics.
Produce messages to a Kafka topic partition.
Produce a single message to a Kafka topic partition.
Set the consumer group name that will be used by the given client for autocommit.
Start a new Kafka client.
Synchronize consumer group state.
Get topic metadata for the given topics.
Types
@type client() :: GenServer.server()
@type consumer_group_name() :: KafkaEx.Support.Types.consumer_group_name()
@type correlation_id() :: non_neg_integer()
@type error_atom() :: atom()
@type generation_id() :: non_neg_integer()
@type member_id() :: binary()
@type node_id() :: non_neg_integer()
@type offset_val() :: KafkaEx.Support.Types.offset()
@type opts() :: Keyword.t()
@type partition_id() :: KafkaEx.Support.Types.partition()
@type partition_id_request() :: %{partition_num: partition_id()}
@type partition_offset_commit_request() :: %{ partition_num: partition_id(), offset: offset_val() }
@type partition_offset_request() :: %{ partition_num: partition_id(), timestamp: timestamp_request() }
@type timestamp_request() :: KafkaEx.Support.Types.timestamp_request()
@type topic_name() :: KafkaEx.Support.Types.topic()
Functions
@spec api_versions(client(), opts()) :: {:ok, KafkaEx.Messages.ApiVersions.t()} | {:error, error_atom()}
Fetch API versions supported by the Kafka broker.
Queries the broker to discover which Kafka API versions it supports. This enables the client to negotiate compatible API versions for all operations.
The response includes the minimum and maximum supported versions for each Kafka API, identified by their API key.
Examples
{:ok, versions} = KafkaEx.API.api_versions(client)
@spec child_spec(opts()) :: Supervisor.child_spec()
Returns a child specification for starting a client under a supervisor.
Examples
children = [
{KafkaEx.API, name: MyApp.KafkaClient, brokers: [{"localhost", 9092}]}
]
Supervisor.start_link(children, strategy: :one_for_one)
@spec cluster_metadata(client()) :: {:ok, KafkaEx.Cluster.ClusterMetadata.t()}
Returns the cluster metadata from the given client.
This returns the cluster metadata currently cached in the client's state.
It does not make a network request to Kafka. If you need fresh metadata,
use metadata/2 or metadata/3 instead.
Examples
{:ok, cluster_metadata} = KafkaEx.API.cluster_metadata(client)
commit_offset(client, consumer_group, topic, partitions, opts \\ [])
View Source@spec commit_offset( client(), consumer_group_name(), topic_name(), [partition_offset_commit_request()], opts() ) :: {:ok, [KafkaEx.Messages.Offset.t()]} | {:error, error_atom()}
Commit offsets for a consumer group.
Stores the committed offsets for the specified topic/partition(s) for a consumer group. This is used for manual offset management and consumer group coordination.
Examples
partitions = [%{partition_num: 0, offset: 100}]
{:ok, result} = KafkaEx.API.commit_offset(client, "my-group", "my-topic", partitions)
@spec correlation_id(client()) :: {:ok, correlation_id()}
Returns the current correlation id for the given client.
@spec create_topic(client(), topic_name(), opts()) :: {:ok, KafkaEx.Messages.CreateTopics.t()} | {:error, error_atom()}
Creates a single topic with the given configuration.
This is a convenience function that wraps create_topics/4 for creating a single topic.
Parameters
client- The client pidtopic_name- The name of the topic to createopts- Keyword list with::num_partitions- Number of partitions (default: -1 for broker default):replication_factor- Replication factor (default: -1 for broker default):config_entries- Topic configuration entries (default: []):timeout- Request timeout in milliseconds (default: 10_000):validate_only- If true, only validate (V1+, default: false):api_version- API version to use (default: 1)
Examples
# Create with defaults
{:ok, result} = KafkaEx.API.create_topic(client, "my-topic")
# Create with specific configuration
{:ok, result} = KafkaEx.API.create_topic(client, "my-topic",num_partitions: 6, replication_factor: 3, timeout: 30_000)
@spec create_topics(client(), list(), non_neg_integer(), opts()) :: {:ok, KafkaEx.Messages.CreateTopics.t()} | {:error, error_atom()}
Creates one or more topics in the Kafka cluster.
Parameters
client- The client pidtopics- List of topic configurations, each being a keyword list or map with::topic- Topic name (required):num_partitions- Number of partitions (default: -1 for broker default):replication_factor- Replication factor (default: -1 for broker default):replica_assignment- Manual replica assignment (default: []):config_entries- Topic configuration entries as{name, value}tuples (default: [])
timeout- Request timeout in milliseconds (required)opts- Optional keyword list with::validate_only- If true, only validate without creating topics (V1+, default: false):api_version- API version to use (default: 1)
Examples
Create a topic with broker defaults
= KafkaEx.API.create_topics(client, [[topic: "my-topic"]], 10_000)
Create a topic with custom configuration
= KafkaEx.API.create_topics(client, [ [
topic: "my-topic", num_partitions: 3, replication_factor: 2, config_entries: [ {"cleanup.policy", "compact"}, {"retention.ms", "86400000"} ]] ], 10_000)
Validate only (V1+)
= KafkaEx.API.create_topics(client, [ [topic: "my-topic", num_partitions: 3] ], 10_000, validate_only: true)
Check results
if CreateTopics.success?(result) do IO.puts("All topics created successfully") else for failed <- CreateTopics.failed_topics(result) do
IO.puts("Failed to create #{failed.topic}: #{failed.error}")end end
@spec delete_topic(client(), topic_name(), opts()) :: {:ok, KafkaEx.Messages.DeleteTopics.t()} | {:error, error_atom()}
Deletes a single topic from the Kafka cluster.
This is a convenience function that wraps delete_topics/4 for deleting a single topic.
Parameters
client- The client pidtopic_name- The name of the topic to deleteopts- Keyword list with::timeout- Request timeout in milliseconds (default: 30_000):api_version- API version to use (default: 1)
Examples
# Delete with defaults
{:ok, result} = KafkaEx.API.delete_topic(client, "my-topic")
# Delete with custom timeout
{:ok, result} = KafkaEx.API.delete_topic(client, "my-topic", timeout: 60_000)
@spec delete_topics(client(), [topic_name()], timeout(), opts()) :: {:ok, KafkaEx.Messages.DeleteTopics.t()} | {:error, error_atom()}
Deletes topics from the Kafka cluster.
Parameters
client- The client pidtopics- List of topic names to deletetimeout- Request timeout in millisecondsopts- Keyword list with::api_version- API version to use (default: 1)
Returns
{:ok, DeleteTopics.t()}- Result with status for each topic{:error, error_atom}- Error if the request failed
Examples
# Delete a single topic
{:ok, result} = KafkaEx.API.delete_topics(client, ["my-topic"], 30_000)
# Delete multiple topics
{:ok, result} = KafkaEx.API.delete_topics(client, ["topic1", "topic2", "topic3"], 30_000)
# Check results
if DeleteTopics.success?(result) do
IO.puts("All topics deleted successfully")
else
failed = DeleteTopics.failed_topics(result)
IO.inspect(failed, label: "Failed to delete")
end
@spec describe_group(client(), consumer_group_name(), opts()) :: {:ok, KafkaEx.Messages.ConsumerGroupDescription.t()} | {:error, any()}
Sends a request to describe a group identified by its name.
We support only one consumer group per request for now, as we don't group requests by group coordinator.
Examples
{:ok, description} = KafkaEx.API.describe_group(client, "my-group")
@spec earliest_offset(client(), topic_name(), partition_id(), opts()) :: {:ok, offset_val()} | {:error, error_atom()}
Fetch the earliest offset for a given partition.
Examples
{:ok, offset} = KafkaEx.API.earliest_offset(client, "my-topic", 0)
@spec fetch(client(), topic_name(), partition_id(), offset_val(), opts()) :: {:ok, KafkaEx.Messages.Fetch.t()} | {:error, error_atom()}
Fetch records from a Kafka topic partition.
Retrieves records from the specified topic and partition starting from the given offset. Returns the fetched records along with metadata about the fetch (high watermark, etc).
Options
:max_bytes- Maximum bytes to fetch per partition (default: 1,000,000):max_wait_time- Maximum time to wait for records in ms (default: 10,000):min_bytes- Minimum bytes to accumulate before returning (default: 1):isolation_level- 0 for READ_UNCOMMITTED, 1 for READ_COMMITTED (V4+, default: 0):api_version- API version to use (default: 3)
Examples
{:ok, result} = KafkaEx.API.fetch(client, "my-topic", 0, 0)
messages = result.records
@spec fetch_all(client(), topic_name(), partition_id(), opts()) :: {:ok, KafkaEx.Messages.Fetch.t()} | {:error, error_atom()}
Fetch all available records from a topic partition.
Convenience function that fetches records from the earliest offset up to the high watermark. Useful for reading all data in a partition (within the configured max_bytes limit).
Examples
{:ok, result} = KafkaEx.API.fetch_all(client, "my-topic", 0)
fetch_committed_offset(client, consumer_group, topic, partitions, opts \\ [])
View Source@spec fetch_committed_offset( client(), consumer_group_name(), topic_name(), [partition_id_request()], opts() ) :: {:ok, [KafkaEx.Messages.Offset.t()]} | {:error, error_atom()}
Fetch committed offsets for a consumer group.
Retrieves the last committed offsets for the specified topic/partition(s) for a consumer group. This is useful for tracking consumer group progress and implementing manual offset management.
Examples
partitions = [%{partition_num: 0}]
{:ok, offsets} = KafkaEx.API.fetch_committed_offset(client, "my-group", "my-topic", partitions)
@spec find_coordinator(client(), consumer_group_name(), opts()) :: {:ok, KafkaEx.Messages.FindCoordinator.t()} | {:error, error_atom()}
Find the coordinator broker for a consumer group or transaction.
Discovers which broker is the coordinator for the specified consumer group (or transactional producer). This is used internally for consumer group operations but can also be called directly.
Options
:coordinator_type- 0 for group (default), 1 for transaction:api_version- API version to use (default: 1)
Examples
# Find group coordinator
{:ok, coordinator} = KafkaEx.API.find_coordinator(client, "my-consumer-group")
# Find transaction coordinator
{:ok, coordinator} = KafkaEx.API.find_coordinator(client, "my-transactional-id",
coordinator_type: :transaction)
heartbeat(client, consumer_group, member_id, generation_id, opts \\ [])
View Source@spec heartbeat(client(), consumer_group_name(), member_id(), generation_id(), opts()) :: {:ok, KafkaEx.Messages.Heartbeat.t()} | {:error, error_atom()}
Send a heartbeat to a consumer group coordinator.
Sends a periodic heartbeat to the group coordinator to indicate that the consumer is still active and participating in the consumer group. This is required to maintain group membership and prevent rebalancing.
Examples
{:ok, result} = KafkaEx.API.heartbeat(client, "my-group", member_id, generation_id)
@spec join_group(client(), consumer_group_name(), member_id(), opts()) :: {:ok, KafkaEx.Messages.JoinGroup.t()} | {:error, error_atom()}
Join a consumer group.
Initiates the consumer group join protocol. This is the first step in the consumer group coordination protocol. The consumer sends its metadata (topics it's interested in) and receives back group membership information including the generation ID and member ID.
If this member is elected as the group leader, it will receive the metadata
of all members and is responsible for computing partition assignments (via
sync_group/5).
API Version Differences
- V0: Basic JoinGroup with session_timeout
- V1: Adds separate rebalance_timeout field
- V2: Adds throttle_time_ms to response
Examples
{:ok, result} = KafkaEx.API.join_group(client, "my-group", "")
@spec latest_offset(client(), topic_name(), partition_id(), opts()) :: {:ok, offset_val()} | {:error, error_atom()}
Fetch the latest offset for a given partition.
Examples
{:ok, offset} = KafkaEx.API.latest_offset(client, "my-topic", 0)
@spec leave_group(client(), consumer_group_name(), member_id(), opts()) :: {:ok, :no_error | KafkaEx.Messages.LeaveGroup.t()} | {:error, error_atom()}
Leave a consumer group.
Notifies the group coordinator that a consumer is voluntarily leaving the consumer group. This allows the coordinator to immediately trigger a rebalance without waiting for a session timeout, improving rebalance latency.
Examples
{:ok, result} = KafkaEx.API.leave_group(client, "my-group", member_id)
@spec list_offsets(client(), [{topic_name(), [partition_offset_request()]}], opts()) :: {:ok, [KafkaEx.Messages.Offset.t()]} | {:error, any()}
Returns list of Offsets per topic per partition.
We support only one topic partition pair for now, as we don't request by leader.
Examples
partitions = [%{partition_num: 0, timestamp: :latest}]
{:ok, offsets} = KafkaEx.API.list_offsets(client, [{"my-topic", partitions}])
@spec metadata(client(), opts()) :: {:ok, KafkaEx.Cluster.ClusterMetadata.t()} | {:error, error_atom()}
Fetch metadata from Kafka brokers.
Makes a network request to fetch fresh metadata from Kafka. This updates the client's internal cluster metadata state and returns the updated metadata.
By default, fetches metadata for all topics in the cluster. To fetch metadata
for specific topics only, use metadata/3.
Examples
{:ok, metadata} = KafkaEx.API.metadata(client)
{:ok, metadata} = KafkaEx.API.metadata(client, api_version: 2)
@spec metadata(client(), [topic_name()] | nil, opts()) :: {:ok, KafkaEx.Cluster.ClusterMetadata.t()} | {:error, error_atom()}
Fetch metadata for specific topics.
Makes a network request to fetch fresh metadata for the specified topics only. This is more efficient than fetching all topics if you only need information about a subset of topics.
Examples
{:ok, metadata} = KafkaEx.API.metadata(client, ["topic1", "topic2"], [])
@spec produce(client(), topic_name(), partition_id() | nil, [map()], opts()) :: {:ok, KafkaEx.Messages.RecordMetadata.t()} | {:error, error_atom()}
Produce messages to a Kafka topic partition.
Sends one or more messages to the specified topic and partition. Returns the base offset assigned to the first message in the batch.
Parameters
client- The client pidtopic- Topic namepartition- Partition number, ornilto use partitionermessages- List of message maps with:value, optional:key,:timestamp,:headersopts- Options including::api_version- API version to use:required_acks- Number of acks required (default: 1):timeout- Request timeout:partitioner- Custom partitioner module (default: configured orKafkaEx.Producer.Partitioner.Default)
Partitioning
When partition is nil, the partitioner is used to determine the partition:
- If message has a
:key, the default partitioner uses murmur2 hash for consistent partitioning - If message has no
:key, a random partition is selected
You can configure the default partitioner globally:
config :kafka_ex, partitioner: MyApp.CustomPartitionerOr per-request:
KafkaEx.API.produce(client, "topic", nil, messages, partitioner: MyApp.CustomPartitioner)Examples
# Explicit partition
messages = [%{value: "hello"}, %{key: "k1", value: "world"}]
{:ok, metadata} = KafkaEx.API.produce(client, "my-topic", 0, messages)
# Use partitioner (partition determined by key)
{:ok, metadata} = KafkaEx.API.produce(client, "my-topic", nil, [%{key: "user-123", value: "data"}])
# Use custom partitioner
{:ok, metadata} = KafkaEx.API.produce(client, "my-topic", nil, messages,
partitioner: MyApp.RoundRobinPartitioner)
@spec produce_one(client(), topic_name(), partition_id() | nil, binary(), opts()) :: {:ok, KafkaEx.Messages.RecordMetadata.t()} | {:error, error_atom()}
Produce a single message to a Kafka topic partition.
Convenience function for producing a single message. Wraps the message in a list and calls produce/5.
Examples
# Explicit partition
{:ok, metadata} = KafkaEx.API.produce_one(client, "my-topic", 0, "hello")
{:ok, metadata} = KafkaEx.API.produce_one(client, "my-topic", 0, "hello", key: "my-key")
# Use partitioner (key-based partitioning)
{:ok, metadata} = KafkaEx.API.produce_one(client, "my-topic", nil, "hello", key: "user-123")
@spec set_consumer_group_for_auto_commit(client(), consumer_group_name()) :: :ok | {:error, :invalid_consumer_group}
Set the consumer group name that will be used by the given client for autocommit.
NOTE this function will not be supported after the legacy API is removed.
Start a new Kafka client.
Options
:brokers- List of broker tuples, e.g.,[{"localhost", 9092}]:client_id- Client identifier string- All options supported by
KafkaEx.Client.start_link/1
Examples
# Start with default configuration
{:ok, client} = KafkaEx.API.start_client()
# Start with custom brokers
{:ok, client} = KafkaEx.API.start_client(brokers: [{"kafka1", 9092}, {"kafka2", 9092}])
# Start a named client
{:ok, client} = KafkaEx.API.start_client(name: MyApp.KafkaClient)
sync_group(client, consumer_group, generation_id, member_id, opts \\ [])
View Source@spec sync_group( client(), consumer_group_name(), generation_id(), member_id(), opts() ) :: {:ok, KafkaEx.Messages.SyncGroup.t()} | {:error, error_atom()}
Synchronize consumer group state.
Completes the consumer group rebalance protocol by synchronizing state between the group leader and followers. The leader provides partition assignments which are distributed to all members. Followers receive their assigned partitions from this call.
Examples
{:ok, result} = KafkaEx.API.sync_group(client, "my-group", gen_id, member_id, assignments: assignments)
@spec topics_metadata(client(), [topic_name()], boolean()) :: {:ok, [KafkaEx.Cluster.Topic.t()]} | {:error, error_atom()}
Get topic metadata for the given topics.
Always calls out to the broker to get the most up-to-date metadata (and subsequently updates the client's state with the updated metadata). Set allow_topic_creation to true to allow the topics to be created if they don't exist.
Examples
{:ok, topics} = KafkaEx.API.topics_metadata(client, ["my-topic"])