Kafka API

This module is the main API for users of the KafkaEx library.

Most of these functions either use the default worker (registered as :kafka_ex) by default or can take a registered name or pid via a worker_name option.

# create an unnamed worker
{:ok, pid} = KafkaEx.create_worker(:no_name)

KafkaEx.fetch("some_topic", 0, worker_name: pid)

Retrieve supported api versions for each api key.

Builds options to be used with workers

Returns the name of the consumer group for the given worker.

Create topics. Must provide a list of CreateTopicsRequest, each containing all the information needed for the creation of a new topic.

create_worker creates KafkaEx workers

Delete topics. Must provide a list of topic names.

Get the offset of the earliest message still persistent in Kafka

Fetch a set of messages from Kafka from the given topic and partition ID

Sends a heartbeat to maintain membership in a consumer group.

Sends a request to join a consumer group.

Get the offset of the latest message written to Kafka

Sends a request to leave a consumer group.

Return metadata for the given topic; returns for all topics if topic is empty string

Get the offset of the message sent at the specified date/time

Produces batch messages to kafka logs

Produces messages to kafka logs (this is deprecated, use KafkaEx.produce/2 instead) Optional arguments(KeywordList)

Callback implementation for Application.start/2.

Start and link a worker outside of a supervision tree

Stop a worker created with create_worker/2

Returns a streamable struct that may be used for consuming messages.

Sends a request to synchronize with a consumer group.

Returns true if the input is a valid consumer group or :no_consumer_group

ssl_options() :: [
  cacertfile: binary(),
  certfile: binary(),
  keyfile: binary(),
  password: binary()


uri() :: [{binary() | [char()], number()}]


worker_init() :: [worker_setting()]


worker_setting() ::
  {:uris, uri()}
  | {:consumer_group, binary() | :no_consumer_group}
  | {:metadata_update_interval, non_neg_integer()}
  | {:consumer_group_update_interval, non_neg_integer()}
  | {:ssl_options, ssl_options()}
  | {:initial_topics, [binary()]}

api_versions(opts \\ [])

api_versions(Keyword.t()) :: KafkaEx.Protocol.ApiVersions.Response.t()

Retrieve supported api versions for each api key.

build_worker_options(worker_init()) ::
  {:ok, worker_init()} | {:error, :invalid_consumer_group}

Builds options to be used with workers

Merges the given options with defaults from the application env config. Returns {:error, :invalid_consumer_options} if the consumer group configuration is invalid, and {:ok, merged_options} otherwise.

Note this happens automatically when using KafkaEx.create_worker.

consumer_group(worker \\ Config.default_worker())

consumer_group(atom() | pid()) :: binary() | :no_consumer_group

Returns the name of the consumer group for the given worker.

Worker may be an atom or pid. The default worker is used by default.

consumer_group_metadata(worker_name, supplied_consumer_group)

consumer_group_metadata(atom(), binary()) ::
create_topics(requests, opts \\ [])

create_topics([KafkaEx.Protocol.CreateTopics.TopicRequest.t()], Keyword.t()) ::

Create topics. Must provide a list of CreateTopicsRequest, each containing all the information needed for the creation of a new topic.

create_worker(name, worker_init \\ [])

create_worker(atom(), worker_init()) :: Supervisor.on_start_child()

create_worker creates KafkaEx workers

Optional arguments(KeywordList)

  • consumer_group: Name of the group of consumers, :no_consumer_group should be passed for Kafka < 0.8.2, defaults to Application.get_env(:kafka_ex, :consumer_group)
  • uris: List of brokers in {"host", port} or comma separated value "host:port,host:port" form, defaults to Application.get_env(:kafka_ex, :brokers)
  • metadata_update_interval: How often kafka_ex would update the Kafka cluster metadata information in milliseconds, default is 30000
  • consumer_group_update_interval: How often kafka_ex would update the Kafka cluster consumer_groups information in milliseconds, default is 30000
  • use_ssl: Boolean flag specifying if ssl should be used for the connection by the worker to kafka, default is false
  • ssl_options: see SSL OPTION DESCRIPTIONS - CLIENT SIDE at http://erlang.org/doc/man/ssl.html, default is []

Returns {:error, error_description} on invalid arguments


iex> KafkaEx.create_worker(:pr) # where :pr is the name of the worker created
{:ok, #PID<0.171.0>}
iex> KafkaEx.create_worker(:pr, uris: [{"localhost", 9092}])
{:ok, #PID<0.172.0>}
iex> KafkaEx.create_worker(:pr, [uris: [{"localhost", 9092}], consumer_group: "foo"])
{:ok, #PID<0.173.0>}
iex> KafkaEx.create_worker(:pr, consumer_group: nil)
{:error, :invalid_consumer_group}
delete_topics(topics, opts \\ [])

delete_topics([String.t()], Keyword.t()) ::

Delete topics. Must provide a list of topic names.

earliest_offset(topic, partition, name \\ Config.default_worker())

earliest_offset(binary(), integer(), atom() | pid()) ::
  [KafkaEx.Protocol.Offset.Response.t()] | :topic_not_found

Get the offset of the earliest message still persistent in Kafka


iex> KafkaEx.earliest_offset("foo", 0)
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: 0, offset: [0], partition: 0}], topic: "foo"}]
fetch(topic, partition, opts \\ [])

fetch(binary(), number(), Keyword.t()) ::
  [KafkaEx.Protocol.Fetch.Response.t()] | :topic_not_found

Fetch a set of messages from Kafka from the given topic and partition ID

Optional arguments(KeywordList)

  • offset: When supplied the fetch would start from this offset, otherwise would start from the last committed offset of the consumer_group the worker belongs to. For Kafka < 0.8.2 you should explicitly specify this.
  • worker_name: the worker we want to run this fetch request through. Default is :kafka_ex
  • wait_time: maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued. Default is 10
  • min_bytes: minimum number of bytes of messages that must be available to give a response. If the client sets this to 0 the server will always respond immediately, however if there is no new data since their last request they will just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting wait_time to 100 and setting min_bytes 64000 would allow the server to wait up to 100ms to try to accumulate 64k of data before responding). Default is 1
  • max_bytes: maximum bytes to include in the message set for this partition. This helps bound the size of the response. Default is 1,000,000
  • auto_commit: specifies if the last offset should be commited or not. Default is true. You must set this to false when using Kafka < 0.8.2 or :no_consumer_group.
  • api_version: Version of the Fetch API message to send (Kayrock client only, default: 0)
  • offset_commit_api_version: Version of the OffsetCommit API message to send (Kayrock client only, only relevant for auto commit, default: 0, use 2+ to store offsets in Kafka instead of Zookeeper)


iex> KafkaEx.fetch("foo", 0, offset: 0)
  %KafkaEx.Protocol.Fetch.Response{partitions: [
    %{error_code: 0, hw_mark_offset: 1, message_set: [
      %{attributes: 0, crc: 748947812, key: nil, offset: 0, value: "hey foo"}
    ], partition: 0}
  ], topic: "foo"}
heartbeat(request, opts \\ [])

heartbeat(KafkaEx.Protocol.Heartbeat.Request.t(), Keyword.t()) ::

Sends a heartbeat to maintain membership in a consumer group.

join_group(request, opts \\ [])

join_group(KafkaEx.Protocol.JoinGroup.Request.t(), Keyword.t()) ::

Sends a request to join a consumer group.

latest_offset(topic, partition, name \\ Config.default_worker())

latest_offset(binary(), integer(), atom() | pid()) ::
  [KafkaEx.Protocol.Offset.Response.t()] | :topic_not_found

Get the offset of the latest message written to Kafka


iex> KafkaEx.latest_offset("foo", 0)
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: 0, offsets: [16], partition: 0}], topic: "foo"}]
leave_group(request, opts \\ [])

leave_group(KafkaEx.Protocol.LeaveGroup.Request.t(), Keyword.t()) ::

Sends a request to leave a consumer group.


metadata(Keyword.t()) :: KafkaEx.Protocol.Metadata.Response.t()

Return metadata for the given topic; returns for all topics if topic is empty string

Optional arguments(KeywordList)

  • worker_name: the worker we want to run this metadata request through, when none is provided the default worker :kafka_ex is used
  • topic: name of the topic for which metadata is requested, when none is provided all metadata is retrieved


iex> KafkaEx.create_worker(:mt)
iex> KafkaEx.metadata(topic: "foo", worker_name: :mt)
%KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "",
   node_id: 49162, port: 49162, socket: nil}],
 topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: 0,
   partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0,
     isrs: [49162], leader: 49162, partition_id: 0, replicas: [49162]}],
   topic: "foo"}]}
offset(topic, partition, time, name \\ Config.default_worker())

  :calendar.datetime() | :earliest | :latest,
  atom() | pid()
) :: [KafkaEx.Protocol.Offset.Response.t()] | :topic_not_found

Get the offset of the message sent at the specified date/time


iex> KafkaEx.offset("foo", 0, {{2015, 3, 29}, {23, 56, 40}}) # Note that the time specified should match/be ahead of time on the server that kafka runs
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: 0, offset: [256], partition: 0}], topic: "foo"}]
offset_commit(worker_name, offset_commit_request)

offset_commit(atom(), KafkaEx.Protocol.OffsetCommit.Request.t()) :: [
offset_fetch(worker_name, offset_fetch_request)

offset_fetch(atom(), KafkaEx.Protocol.OffsetFetch.Request.t()) ::
  [KafkaEx.Protocol.OffsetFetch.Response.t()] | :topic_not_found
produce(produce_request, opts \\ [])

produce(KafkaEx.Protocol.Produce.Request.t(), Keyword.t()) ::
  | :ok
  | {:ok, integer()}
  | {:error, :closed}
  | {:error, :inet.posix()}
  | {:error, any()}
  | iodata()
  | :leader_not_available

Produces batch messages to kafka logs

Optional arguments(KeywordList)

  • worker_name: the worker we want to run this metadata request through, when none is provided the default worker :kafka_ex is used


iex> KafkaEx.produce(%KafkaEx.Protocol.Produce.Request{topic: "foo", partition: 0, required_acks: 1, messages: [%KafkaEx.Protocol.Produce.Message{value: "hey"}]})
{:ok, 9772}
iex> KafkaEx.produce(%KafkaEx.Protocol.Produce.Request{topic: "foo", partition: 0, required_acks: 1, messages: [%KafkaEx.Protocol.Produce.Message{value: "hey"}]}, worker_name: :pr)
{:ok, 9773}
produce(topic, partition, value, opts \\ [])

produce(binary(), number(), binary(), Keyword.t()) ::
  | :ok
  | {:ok, integer()}
  | {:error, :closed}
  | {:error, :inet.posix()}
  | {:error, any()}
  | iodata()
  | :leader_not_available

Produces messages to kafka logs (this is deprecated, use KafkaEx.produce/2 instead) Optional arguments(KeywordList)

  • worker_name: the worker we want to run this metadata request through, when none is provided the default worker :kafka_ex is used
  • key: is used for partition assignment, can be nil, when none is provided it is defaulted to nil
  • required_acks: indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas), default is 0
  • timeout: provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in RequiredAcks, default is 100 milliseconds
  • compression: specifies the compression type (:none, :snappy, :gzip)
  • api_version: Version of the Fetch API message to send (Kayrock client only, default: 0)
  • timestamp: unix epoch timestamp in milliseconds for the message (Kayrock client only, default: nil, must be using api_version >= 3)


iex> KafkaEx.produce("bar", 0, "hey")
iex> KafkaEx.produce("foo", 0, "hey", [worker_name: :pr, required_acks: 1])
{:ok, 9771}

Callback implementation for Application.start/2.


stop_worker(atom() | pid()) ::
  :ok | {:error, :not_found} | {:error, :simple_one_for_one}

Stop a worker created with create_worker/2

Returns :ok on success or :error if worker is not a valid worker

stream(topic, partition, opts \\ [])

stream(binary(), integer(), Keyword.t()) :: KafkaEx.Stream.t()

Returns a streamable struct that may be used for consuming messages.

The returned struct is compatible with the Stream and Enum modules. Some important usage notes follow; see below for a detailed list of options.

iex> KafkaEx.produce("foo", 0, "hey")
iex> KafkaEx.produce("foo", 0, "hi")
iex> stream = KafkaEx.stream("foo", 0)
iex> Enum.take(stream, 2)
[%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 1784030606, key: "",
    offset: 0, value: "hey"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 3776653906, key: "",
     offset: 1, value: "hi"}]
iex> stream |> Stream.map(fn(msg) -> IO.puts(msg.value) end) |> Stream.run
#  NOTE this will block!  See below.

Reusing streams

Reusing the same KafkaEx.Stream struct results in consuming the same messages multiple times. This is by design and mirrors the functionality of File.stream!/3. If you want to reuse the same stream struct, update its :offset before reuse.

iex> stream = KafkaEx.stream("foo", 0)
iex> [m1, m2] = Enum.take(stream, 2)
iex> [m1, m2] = Enum.take(stream, 2)   # these will be the same messages
iex> stream = %{stream | fetch_request: %{stream.fetch_request | offset: m2.offset + 1}}
iex> [m3, m4] = Enum.take(stream, 2)   # new messages

Streams block at log end

By default, the stream consumes indefinitely and will block at log end until new messages are available. Use the no_wait_at_logend: true option to have the stream halt when no more messages are available. This mirrors the command line arguments of SimpleConsumerShell.

Note that this means that fetches will return up to as many messages as are immediately available in the partition, regardless of arguments.

iex> Enum.map(1..3, fn(ix) -> KafkaEx.produce("bar", 0, "Msg #{ix}") end)
iex> stream = KafkaEx.stream("bar", 0, no_wait_at_logend: true, offset: 0)
iex> Enum.map(stream, fn(m) -> m.value end) # does not block
["Msg 1", "Msg 2", "Msg 3"]
iex> stream |> Stream.map(fn(m) -> m.value end) |> Enum.take(10)
# only 3 messages are available
["Msg 1", "Msg 2", "Msg 3"]

Consumer group and auto commit

If you pass a value for the consumer_group option and true for auto_commit, the offset of the last message consumed will be committed to the broker during each cycle.

For example, suppose we start at the beginning of a partition with millions of messages and the max_bytes setting is such that each fetch request gets 25 messages. In this setting, we will (roughly) be committing offsets 25, 50, 75, etc.

Note that offsets are committed immediately after messages are retrieved and before you know if you have successfully consumed them. It is therefore possible that you could miss messages if your consumer crashes in the middle of consuming a batch, effectively losing the guarantee of at-least-once delivery. If you need this guarantee, we recommend that you construct a GenServer-based consumer module and manage your commits manually.

iex> Enum.map(1..10, fn(ix) -> KafkaEx.produce("baz", 0, "Msg #{ix}") end)
iex> stream = KafkaEx.stream("baz", 0, consumer_group: "my_consumer", auto_commit: true)
iex> stream |> Enum.take(2) |> Enum.map(fn(msg) -> msg.value end)
["Msg 1", "Msg 2"]
iex> stream |> Enum.take(2) |> Enum.map(fn(msg) -> msg.value end)
["Msg 1", "Msg 2"]  # same values
iex> stream2 = KafkaEx.stream("baz", 0, consumer_group: "my_consumer", auto_commit: true)
iex> stream2 |> Enum.take(1) |> Enum.map(fn(msg) -> msg.value end)
["Msg 3"] # stream2 got the next available offset


KafkaEx.stream/3 accepts a keyword list of options for the third argument.

  • no_wait_at_logend (boolean): Set this to true to halt the stream when there are no more messages available. Defaults to false, i.e., the stream blocks to wait for new messages.

  • worker_name (term): The KafkaEx worker to use for communication with the brokers. Defaults to :kafka_ex (the default worker).

  • consumer_group (string): Name of the consumer group used for the initial offset fetch and automatic offset commit (if auto_commit is true). Omit this value or use :no_consumer_group to not use a consumer group (default). Consumer groups are not compatible with Kafka < 0.8.2.

  • offset (integer): The offset from which to start fetching. By default, this is the last available offset of the partition when no consumer group is specified. When a consumer group is specified, the next message after the last committed offset is used. For Kafka < 0.8.2 you must explicitly specify an offset.

  • auto_commit (boolean): If true, the stream automatically commits offsets of fetched messages. See discussion above.

  • api_versions (map): Allows overriding api versions for :fetch, :offset_fetch, and :offset_commit when using the Kayrock client. Defaults to %{fetch: 0, offset_fetch: 0, offset_commit: 0}. Use %{fetch: 3, offset_fetch: 3, offset_commit: 3} with the kayrock client to achieve offsets stored in kafka (instead of zookeeper) and messages fetched with timestamps.

sync_group(request, opts \\ [])

sync_group(KafkaEx.Protocol.SyncGroup.Request.t(), Keyword.t()) ::

Sends a request to synchronize with a consumer group.

valid_consumer_group?(any()) :: boolean()

Returns true if the input is a valid consumer group or :no_consumer_group