View Source KafkaEx.API (kafka_ex v1.0.0-rc.1)

The primary Kafka client API for KafkaEx v1.0+

This module provides two ways to interact with Kafka:

Direct API Usage

All functions take a client pid as the first argument:

{:ok, client} = KafkaEx.API.start_client()

# Produce a message
{:ok, metadata} = KafkaEx.API.produce(client, "my-topic", 0, [%{value: "hello"}])

# Fetch messages
{:ok, fetch_result} = KafkaEx.API.fetch(client, "my-topic", 0, 0)

Using via use KafkaEx.API

You can also use this module in your own module to get API functions that automatically use a configured client:

defmodule MyApp.Kafka do
  use KafkaEx.API

  # Override client/0 to return your client pid
  def client do
    MyApp.KafkaClient  # a named client
  end
end

# Now call without passing client:
{:ok, metadata} = MyApp.Kafka.produce("my-topic", 0, [%{value: "hello"}])

Options for use

  • :client - A module, atom, or function that returns the client pid. If not provided, you must implement client/0 in your module.

Example with static client name:

defmodule MyApp.Kafka do
  use KafkaEx.API, client: MyApp.KafkaClient
end

Example with dynamic client lookup:

defmodule MyApp.Kafka do
  use KafkaEx.API

  def client do
    Process.whereis(:my_kafka_client) || raise "Client not started"
  end
end

Summary

Functions

Fetch API versions supported by the Kafka broker.

Returns a child specification for starting a client under a supervisor.

Returns the cluster metadata from the given client.

Returns the current correlation id for the given client.

Creates a single topic with the given configuration.

Creates one or more topics in the Kafka cluster.

Deletes a single topic from the Kafka cluster.

Deletes topics from the Kafka cluster.

Sends a request to describe a group identified by its name.

Fetch the earliest offset for a given partition.

Fetch records from a Kafka topic partition.

Fetch all available records from a topic partition.

Find the coordinator broker for a consumer group or transaction.

Send a heartbeat to a consumer group coordinator.

Fetch the latest offset for a given partition.

Returns list of Offsets per topic per partition.

Fetch metadata from Kafka brokers.

Fetch metadata for specific topics.

Produce messages to a Kafka topic partition.

Produce a single message to a Kafka topic partition.

Set the consumer group name that will be used by the given client for autocommit.

Start a new Kafka client.

Get topic metadata for the given topics.

Types

@type client() :: GenServer.server()
@type consumer_group_name() :: KafkaEx.Support.Types.consumer_group_name()
@type correlation_id() :: non_neg_integer()
@type error_atom() :: atom()
@type generation_id() :: non_neg_integer()
@type member_id() :: binary()
@type node_id() :: non_neg_integer()
@type offset_val() :: KafkaEx.Support.Types.offset()
@type opts() :: Keyword.t()
@type partition_id() :: KafkaEx.Support.Types.partition()
Link to this type

partition_id_request()

View Source
@type partition_id_request() :: %{partition_num: partition_id()}
Link to this type

partition_offset_commit_request()

View Source
@type partition_offset_commit_request() :: %{
  partition_num: partition_id(),
  offset: offset_val()
}
Link to this type

partition_offset_request()

View Source
@type partition_offset_request() :: %{
  partition_num: partition_id(),
  timestamp: timestamp_request()
}
@type timestamp_request() :: KafkaEx.Support.Types.timestamp_request()
@type topic_name() :: KafkaEx.Support.Types.topic()

Functions

Link to this function

api_versions(client, opts \\ [])

View Source
@spec api_versions(client(), opts()) ::
  {:ok, KafkaEx.Messages.ApiVersions.t()} | {:error, error_atom()}

Fetch API versions supported by the Kafka broker.

Queries the broker to discover which Kafka API versions it supports. This enables the client to negotiate compatible API versions for all operations.

The response includes the minimum and maximum supported versions for each Kafka API, identified by their API key.

Examples

{:ok, versions} = KafkaEx.API.api_versions(client)
@spec child_spec(opts()) :: Supervisor.child_spec()

Returns a child specification for starting a client under a supervisor.

Examples

children = [
  {KafkaEx.API, name: MyApp.KafkaClient, brokers: [{"localhost", 9092}]}
]

Supervisor.start_link(children, strategy: :one_for_one)
Link to this function

cluster_metadata(client)

View Source
@spec cluster_metadata(client()) :: {:ok, KafkaEx.Cluster.ClusterMetadata.t()}

Returns the cluster metadata from the given client.

This returns the cluster metadata currently cached in the client's state. It does not make a network request to Kafka. If you need fresh metadata, use metadata/2 or metadata/3 instead.

Examples

{:ok, cluster_metadata} = KafkaEx.API.cluster_metadata(client)
Link to this function

commit_offset(client, consumer_group, topic, partitions, opts \\ [])

View Source
@spec commit_offset(
  client(),
  consumer_group_name(),
  topic_name(),
  [partition_offset_commit_request()],
  opts()
) :: {:ok, [KafkaEx.Messages.Offset.t()]} | {:error, error_atom()}

Commit offsets for a consumer group.

Stores the committed offsets for the specified topic/partition(s) for a consumer group. This is used for manual offset management and consumer group coordination.

Examples

partitions = [%{partition_num: 0, offset: 100}]
{:ok, result} = KafkaEx.API.commit_offset(client, "my-group", "my-topic", partitions)
@spec correlation_id(client()) :: {:ok, correlation_id()}

Returns the current correlation id for the given client.

Link to this function

create_topic(client, topic_name, opts \\ [])

View Source
@spec create_topic(client(), topic_name(), opts()) ::
  {:ok, KafkaEx.Messages.CreateTopics.t()} | {:error, error_atom()}

Creates a single topic with the given configuration.

This is a convenience function that wraps create_topics/4 for creating a single topic.

Parameters

  • client - The client pid
  • topic_name - The name of the topic to create
  • opts - Keyword list with:
    • :num_partitions - Number of partitions (default: -1 for broker default)
    • :replication_factor - Replication factor (default: -1 for broker default)
    • :config_entries - Topic configuration entries (default: [])
    • :timeout - Request timeout in milliseconds (default: 10_000)
    • :validate_only - If true, only validate (V1+, default: false)
    • :api_version - API version to use (default: 1)

Examples

# Create with defaults
{:ok, result} = KafkaEx.API.create_topic(client, "my-topic")

# Create with specific configuration
{:ok, result} = KafkaEx.API.create_topic(client, "my-topic",num_partitions: 6, replication_factor: 3, timeout: 30_000)
Link to this function

create_topics(client, topics, timeout, opts \\ [])

View Source
@spec create_topics(client(), list(), non_neg_integer(), opts()) ::
  {:ok, KafkaEx.Messages.CreateTopics.t()} | {:error, error_atom()}

Creates one or more topics in the Kafka cluster.

Parameters

  • client - The client pid

  • topics - List of topic configurations, each being a keyword list or map with:

    • :topic - Topic name (required)
    • :num_partitions - Number of partitions (default: -1 for broker default)
    • :replication_factor - Replication factor (default: -1 for broker default)
    • :replica_assignment - Manual replica assignment (default: [])
    • :config_entries - Topic configuration entries as {name, value} tuples (default: [])
  • timeout - Request timeout in milliseconds (required)

  • opts - Optional keyword list with:

    • :validate_only - If true, only validate without creating topics (V1+, default: false)
    • :api_version - API version to use (default: 1)

    Examples

    Create a topic with broker defaults

    = KafkaEx.API.create_topics(client, [[topic: "my-topic"]], 10_000)

    Create a topic with custom configuration

    = KafkaEx.API.create_topics(client, [ [

    topic: "my-topic",
    num_partitions: 3,
    replication_factor: 2,
    config_entries: [
      {"cleanup.policy", "compact"},
      {"retention.ms", "86400000"}
    ]

    ] ], 10_000)

    Validate only (V1+)

    = KafkaEx.API.create_topics(client, [ [topic: "my-topic", num_partitions: 3] ], 10_000, validate_only: true)

    Check results

    if CreateTopics.success?(result) do IO.puts("All topics created successfully") else for failed <- CreateTopics.failed_topics(result) do

    IO.puts("Failed to create #{failed.topic}: #{failed.error}")

    end end

Link to this function

delete_topic(client, topic_name, opts \\ [])

View Source
@spec delete_topic(client(), topic_name(), opts()) ::
  {:ok, KafkaEx.Messages.DeleteTopics.t()} | {:error, error_atom()}

Deletes a single topic from the Kafka cluster.

This is a convenience function that wraps delete_topics/4 for deleting a single topic.

Parameters

  • client - The client pid
  • topic_name - The name of the topic to delete
  • opts - Keyword list with:
    • :timeout - Request timeout in milliseconds (default: 30_000)
    • :api_version - API version to use (default: 1)

Examples

# Delete with defaults
{:ok, result} = KafkaEx.API.delete_topic(client, "my-topic")

# Delete with custom timeout
{:ok, result} = KafkaEx.API.delete_topic(client, "my-topic", timeout: 60_000)
Link to this function

delete_topics(client, topics, timeout, opts \\ [])

View Source
@spec delete_topics(client(), [topic_name()], timeout(), opts()) ::
  {:ok, KafkaEx.Messages.DeleteTopics.t()} | {:error, error_atom()}

Deletes topics from the Kafka cluster.

Parameters

  • client - The client pid
  • topics - List of topic names to delete
  • timeout - Request timeout in milliseconds
  • opts - Keyword list with:
    • :api_version - API version to use (default: 1)

Returns

  • {:ok, DeleteTopics.t()} - Result with status for each topic
  • {:error, error_atom} - Error if the request failed

Examples

# Delete a single topic
{:ok, result} = KafkaEx.API.delete_topics(client, ["my-topic"], 30_000)

# Delete multiple topics
{:ok, result} = KafkaEx.API.delete_topics(client, ["topic1", "topic2", "topic3"], 30_000)

# Check results
if DeleteTopics.success?(result) do
  IO.puts("All topics deleted successfully")
else
  failed = DeleteTopics.failed_topics(result)
  IO.inspect(failed, label: "Failed to delete")
end
Link to this function

describe_group(client, consumer_group_name, opts \\ [])

View Source
@spec describe_group(client(), consumer_group_name(), opts()) ::
  {:ok, KafkaEx.Messages.ConsumerGroupDescription.t()} | {:error, any()}

Sends a request to describe a group identified by its name.

We support only one consumer group per request for now, as we don't group requests by group coordinator.

Examples

{:ok, description} = KafkaEx.API.describe_group(client, "my-group")
Link to this function

earliest_offset(client, topic, partition_id, opts \\ [])

View Source
@spec earliest_offset(client(), topic_name(), partition_id(), opts()) ::
  {:ok, offset_val()} | {:error, error_atom()}

Fetch the earliest offset for a given partition.

Examples

{:ok, offset} = KafkaEx.API.earliest_offset(client, "my-topic", 0)
Link to this function

fetch(client, topic, partition, offset, opts \\ [])

View Source
@spec fetch(client(), topic_name(), partition_id(), offset_val(), opts()) ::
  {:ok, KafkaEx.Messages.Fetch.t()} | {:error, error_atom()}

Fetch records from a Kafka topic partition.

Retrieves records from the specified topic and partition starting from the given offset. Returns the fetched records along with metadata about the fetch (high watermark, etc).

Options

  • :max_bytes - Maximum bytes to fetch per partition (default: 1,000,000)
  • :max_wait_time - Maximum time to wait for records in ms (default: 10,000)
  • :min_bytes - Minimum bytes to accumulate before returning (default: 1)
  • :isolation_level - 0 for READ_UNCOMMITTED, 1 for READ_COMMITTED (V4+, default: 0)
  • :api_version - API version to use (default: 3)

Examples

{:ok, result} = KafkaEx.API.fetch(client, "my-topic", 0, 0)
messages = result.records
Link to this function

fetch_all(client, topic, partition, opts \\ [])

View Source
@spec fetch_all(client(), topic_name(), partition_id(), opts()) ::
  {:ok, KafkaEx.Messages.Fetch.t()} | {:error, error_atom()}

Fetch all available records from a topic partition.

Convenience function that fetches records from the earliest offset up to the high watermark. Useful for reading all data in a partition (within the configured max_bytes limit).

Examples

{:ok, result} = KafkaEx.API.fetch_all(client, "my-topic", 0)
Link to this function

fetch_committed_offset(client, consumer_group, topic, partitions, opts \\ [])

View Source
@spec fetch_committed_offset(
  client(),
  consumer_group_name(),
  topic_name(),
  [partition_id_request()],
  opts()
) :: {:ok, [KafkaEx.Messages.Offset.t()]} | {:error, error_atom()}

Fetch committed offsets for a consumer group.

Retrieves the last committed offsets for the specified topic/partition(s) for a consumer group. This is useful for tracking consumer group progress and implementing manual offset management.

Examples

partitions = [%{partition_num: 0}]
{:ok, offsets} = KafkaEx.API.fetch_committed_offset(client, "my-group", "my-topic", partitions)
Link to this function

find_coordinator(client, group_id, opts \\ [])

View Source
@spec find_coordinator(client(), consumer_group_name(), opts()) ::
  {:ok, KafkaEx.Messages.FindCoordinator.t()} | {:error, error_atom()}

Find the coordinator broker for a consumer group or transaction.

Discovers which broker is the coordinator for the specified consumer group (or transactional producer). This is used internally for consumer group operations but can also be called directly.

Options

  • :coordinator_type - 0 for group (default), 1 for transaction
  • :api_version - API version to use (default: 1)

Examples

# Find group coordinator
{:ok, coordinator} = KafkaEx.API.find_coordinator(client, "my-consumer-group")

# Find transaction coordinator
{:ok, coordinator} = KafkaEx.API.find_coordinator(client, "my-transactional-id",
  coordinator_type: :transaction)
Link to this function

heartbeat(client, consumer_group, member_id, generation_id, opts \\ [])

View Source
@spec heartbeat(client(), consumer_group_name(), member_id(), generation_id(), opts()) ::
  {:ok, KafkaEx.Messages.Heartbeat.t()} | {:error, error_atom()}

Send a heartbeat to a consumer group coordinator.

Sends a periodic heartbeat to the group coordinator to indicate that the consumer is still active and participating in the consumer group. This is required to maintain group membership and prevent rebalancing.

Examples

{:ok, result} = KafkaEx.API.heartbeat(client, "my-group", member_id, generation_id)
Link to this function

join_group(client, consumer_group, member_id, opts \\ [])

View Source
@spec join_group(client(), consumer_group_name(), member_id(), opts()) ::
  {:ok, KafkaEx.Messages.JoinGroup.t()} | {:error, error_atom()}

Join a consumer group.

Initiates the consumer group join protocol. This is the first step in the consumer group coordination protocol. The consumer sends its metadata (topics it's interested in) and receives back group membership information including the generation ID and member ID.

If this member is elected as the group leader, it will receive the metadata of all members and is responsible for computing partition assignments (via sync_group/5).

API Version Differences

  • V0: Basic JoinGroup with session_timeout
  • V1: Adds separate rebalance_timeout field
  • V2: Adds throttle_time_ms to response

Examples

{:ok, result} = KafkaEx.API.join_group(client, "my-group", "")
Link to this function

latest_offset(client, topic, partition_id, opts \\ [])

View Source
@spec latest_offset(client(), topic_name(), partition_id(), opts()) ::
  {:ok, offset_val()} | {:error, error_atom()}

Fetch the latest offset for a given partition.

Examples

{:ok, offset} = KafkaEx.API.latest_offset(client, "my-topic", 0)
Link to this function

leave_group(client, consumer_group, member_id, opts \\ [])

View Source
@spec leave_group(client(), consumer_group_name(), member_id(), opts()) ::
  {:ok, :no_error | KafkaEx.Messages.LeaveGroup.t()} | {:error, error_atom()}

Leave a consumer group.

Notifies the group coordinator that a consumer is voluntarily leaving the consumer group. This allows the coordinator to immediately trigger a rebalance without waiting for a session timeout, improving rebalance latency.

Examples

{:ok, result} = KafkaEx.API.leave_group(client, "my-group", member_id)
Link to this function

list_offsets(client, list, opts \\ [])

View Source
@spec list_offsets(client(), [{topic_name(), [partition_offset_request()]}], opts()) ::
  {:ok, [KafkaEx.Messages.Offset.t()]} | {:error, any()}

Returns list of Offsets per topic per partition.

We support only one topic partition pair for now, as we don't request by leader.

Examples

partitions = [%{partition_num: 0, timestamp: :latest}]
{:ok, offsets} = KafkaEx.API.list_offsets(client, [{"my-topic", partitions}])
Link to this function

metadata(client, opts \\ [])

View Source
@spec metadata(client(), opts()) ::
  {:ok, KafkaEx.Cluster.ClusterMetadata.t()} | {:error, error_atom()}

Fetch metadata from Kafka brokers.

Makes a network request to fetch fresh metadata from Kafka. This updates the client's internal cluster metadata state and returns the updated metadata.

By default, fetches metadata for all topics in the cluster. To fetch metadata for specific topics only, use metadata/3.

Examples

{:ok, metadata} = KafkaEx.API.metadata(client)
{:ok, metadata} = KafkaEx.API.metadata(client, api_version: 2)
Link to this function

metadata(client, topics, opts)

View Source
@spec metadata(client(), [topic_name()] | nil, opts()) ::
  {:ok, KafkaEx.Cluster.ClusterMetadata.t()} | {:error, error_atom()}

Fetch metadata for specific topics.

Makes a network request to fetch fresh metadata for the specified topics only. This is more efficient than fetching all topics if you only need information about a subset of topics.

Examples

{:ok, metadata} = KafkaEx.API.metadata(client, ["topic1", "topic2"], [])
Link to this function

produce(client, topic, partition, messages, opts \\ [])

View Source
@spec produce(client(), topic_name(), partition_id() | nil, [map()], opts()) ::
  {:ok, KafkaEx.Messages.RecordMetadata.t()} | {:error, error_atom()}

Produce messages to a Kafka topic partition.

Sends one or more messages to the specified topic and partition. Returns the base offset assigned to the first message in the batch.

Parameters

  • client - The client pid
  • topic - Topic name
  • partition - Partition number, or nil to use partitioner
  • messages - List of message maps with :value, optional :key, :timestamp, :headers
  • opts - Options including:
    • :api_version - API version to use
    • :required_acks - Number of acks required (default: 1)
    • :timeout - Request timeout
    • :partitioner - Custom partitioner module (default: configured or KafkaEx.Producer.Partitioner.Default)

Partitioning

When partition is nil, the partitioner is used to determine the partition:

  • If message has a :key, the default partitioner uses murmur2 hash for consistent partitioning
  • If message has no :key, a random partition is selected

You can configure the default partitioner globally:

config :kafka_ex, partitioner: MyApp.CustomPartitioner

Or per-request:

KafkaEx.API.produce(client, "topic", nil, messages, partitioner: MyApp.CustomPartitioner)

Examples

# Explicit partition
messages = [%{value: "hello"}, %{key: "k1", value: "world"}]
{:ok, metadata} = KafkaEx.API.produce(client, "my-topic", 0, messages)

# Use partitioner (partition determined by key)
{:ok, metadata} = KafkaEx.API.produce(client, "my-topic", nil, [%{key: "user-123", value: "data"}])

# Use custom partitioner
{:ok, metadata} = KafkaEx.API.produce(client, "my-topic", nil, messages,
  partitioner: MyApp.RoundRobinPartitioner)
Link to this function

produce_one(client, topic, partition, value, opts \\ [])

View Source
@spec produce_one(client(), topic_name(), partition_id() | nil, binary(), opts()) ::
  {:ok, KafkaEx.Messages.RecordMetadata.t()} | {:error, error_atom()}

Produce a single message to a Kafka topic partition.

Convenience function for producing a single message. Wraps the message in a list and calls produce/5.

Examples

# Explicit partition
{:ok, metadata} = KafkaEx.API.produce_one(client, "my-topic", 0, "hello")
{:ok, metadata} = KafkaEx.API.produce_one(client, "my-topic", 0, "hello", key: "my-key")

# Use partitioner (key-based partitioning)
{:ok, metadata} = KafkaEx.API.produce_one(client, "my-topic", nil, "hello", key: "user-123")
Link to this function

set_consumer_group_for_auto_commit(client, consumer_group)

View Source
@spec set_consumer_group_for_auto_commit(client(), consumer_group_name()) ::
  :ok | {:error, :invalid_consumer_group}

Set the consumer group name that will be used by the given client for autocommit.

NOTE this function will not be supported after the legacy API is removed.

Link to this function

start_client(opts \\ [])

View Source
@spec start_client(opts()) :: {:ok, pid()} | {:error, term()}

Start a new Kafka client.

Options

  • :brokers - List of broker tuples, e.g., [{"localhost", 9092}]
  • :client_id - Client identifier string
  • All options supported by KafkaEx.Client.start_link/1

Examples

# Start with default configuration
{:ok, client} = KafkaEx.API.start_client()

# Start with custom brokers
{:ok, client} = KafkaEx.API.start_client(brokers: [{"kafka1", 9092}, {"kafka2", 9092}])

# Start a named client
{:ok, client} = KafkaEx.API.start_client(name: MyApp.KafkaClient)
Link to this function

sync_group(client, consumer_group, generation_id, member_id, opts \\ [])

View Source
@spec sync_group(
  client(),
  consumer_group_name(),
  generation_id(),
  member_id(),
  opts()
) ::
  {:ok, KafkaEx.Messages.SyncGroup.t()} | {:error, error_atom()}

Synchronize consumer group state.

Completes the consumer group rebalance protocol by synchronizing state between the group leader and followers. The leader provides partition assignments which are distributed to all members. Followers receive their assigned partitions from this call.

Examples

{:ok, result} = KafkaEx.API.sync_group(client, "my-group", gen_id, member_id, assignments: assignments)
Link to this function

topics_metadata(client, topics, allow_topic_creation \\ false)

View Source
@spec topics_metadata(client(), [topic_name()], boolean()) ::
  {:ok, [KafkaEx.Cluster.Topic.t()]} | {:error, error_atom()}

Get topic metadata for the given topics.

Always calls out to the broker to get the most up-to-date metadata (and subsequently updates the client's state with the updated metadata). Set allow_topic_creation to true to allow the topics to be created if they don't exist.

Examples

{:ok, topics} = KafkaEx.API.topics_metadata(client, ["my-topic"])