KafkaEx

Summary

consumer_group_metadata(worker_name, consumer_group)
create_worker(name, worker_init \\ [])

create_worker creates KafkaEx workers

earliest_offset(topic, partition, name \\ KafkaEx.Server)

Get the offset of the earliest message still persistent in Kafka

fetch(topic, partition, opts \\ [])

Fetch a set of messages from Kafka from the given topic and partition ID

latest_offset(topic, partition, name \\ KafkaEx.Server)

Get the offset of the latest message written to Kafka

metadata(opts \\ [])

Return metadata for the given topic; returns for all topics if topic is empty string

offset(topic, partition, time, name \\ KafkaEx.Server)

Get the offset of the message sent at the specified date/time

offset_commit(worker_name, offset_commit_request)
offset_fetch(worker_name, offset_fetch_request)
produce(topic, partition, value, opts \\ [])

Produces messages to kafka logs

start(type, args)

Callback implementation of :application.start/2

stop_streaming(opts \\ [])
stream(topic, partition, opts \\ [])

Returns a stream that consumes fetched messages. This puts the specified worker in streaming mode and blocks the worker indefinitely. The handler is a normal GenEvent handler so you can supply a custom handler, otherwise a default handler is used

Types

uri :: [{binary | char_list, number}]

worker_init :: [uris: uri, consumer_group: binary]

Functions

consumer_group_metadata(worker_name, consumer_group)

Specs:

create_worker(name, worker_init \\ [])

Specs:

create_worker creates KafkaEx workers

Example

iex> KafkaEx.create_worker(:pr) # where :pr is the name of the worker created
{:ok, #PID<0.171.0>}
iex> KafkaEx.create_worker(:pr, uris: [{"localhost", 9092}]) #if no consumer_group is specified "kafka_ex" would be used as the default
{:ok, #PID<0.172.0>}
iex> KafkaEx.create_worker(:pr, [uris: [{"localhost", 9092}], consumer_group: "foo"])
{:ok, #PID<0.173.0>}
earliest_offset(topic, partition, name \\ KafkaEx.Server)

Specs:

  • earliest_offset(binary, integer, atom | pid) :: {atom, %{}}

Get the offset of the earliest message still persistent in Kafka

Example

iex> KafkaEx.earliest_offset("foo", 0)
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: 0, offset: [0], partition: 0}], topic: "foo"}]
fetch(topic, partition, opts \\ [])

Specs:

  • fetch(binary, number, Keyword.t) :: {atom, %{}}

Fetch a set of messages from Kafka from the given topic and partition ID

Optional arguments(KeywordList)

  • offset: When supplied the fetch would start from this offset, otherwise would start from the last committed offset of the consumer_group the worker belongs to.
  • worker_name: the worker we want to run this fetch request through. Default is KafkaEx.Server
  • wait_time: maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued. Default is 10
  • min_bytes: minimum number of bytes of messages that must be available to give a response. If the client sets this to 0 the server will always respond immediately, however if there is no new data since their last request they will just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting wait_time to 100 and setting min_bytes 64000 would allow the server to wait up to 100ms to try to accumulate 64k of data before responding). Default is 1
  • max_bytes: maximum bytes to include in the message set for this partition. This helps bound the size of the response. Default is 1,000,000
  • auto_commit: specifies if the last offset should be commited or not. Default is true

Example

iex> KafkaEx.fetch("foo", 0, offset: 0)
[
  %KafkaEx.Protocol.Fetch.Response{partitions: [
    %{error_code: 0, hw_mark_offset: 1, message_set: [
      %{attributes: 0, crc: 748947812, key: nil, offset: 0, value: "hey foo"}
    ], partition: 0}
  ], topic: "foo"}
]
latest_offset(topic, partition, name \\ KafkaEx.Server)

Specs:

  • latest_offset(binary, integer, atom | pid) :: {atom, %{}}

Get the offset of the latest message written to Kafka

Example

iex> KafkaEx.latest_offset("foo", 0)
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: 0, offsets: [16], partition: 0}], topic: "foo"}]
metadata(opts \\ [])

Specs:

Return metadata for the given topic; returns for all topics if topic is empty string

Optional arguments(KeywordList)

  • worker_name: the worker we want to run this metadata request through, when none is provided the default worker KafkaEx.Server is used
  • topic: name of the topic for which metadata is requested, when none is provided all metadata is retrieved

Example

iex> KafkaEx.create_worker(:mt)
iex> KafkaEx.metadata(topic: "foo", worker_name: :mt)
%KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "192.168.59.103",
   node_id: 49162, port: 49162, socket: nil}],
 topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: 0,
   partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0,
     isrs: [49162], leader: 49162, partition_id: 0, replicas: [49162]}],
   topic: "foo"}]}
offset(topic, partition, time, name \\ KafkaEx.Server)

Specs:

  • offset(binary, number, :calendar.datetime | atom, atom | pid) :: {atom, %{}}

Get the offset of the message sent at the specified date/time

Example

iex> KafkaEx.offset("foo", 0, {{2015, 3, 29}, {23, 56, 40}}) # Note that the time specified should match/be ahead of time on the server that kafka runs
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: 0, offset: [256], partition: 0}], topic: "foo"}]
offset_commit(worker_name, offset_commit_request)

Specs:

offset_fetch(worker_name, offset_fetch_request)

Specs:

produce(topic, partition, value, opts \\ [])

Specs:

  • produce(binary, number, binary, Keyword.t) :: :ok | {:ok, %{}}

Produces messages to kafka logs

Optional arguments(KeywordList)

  • worker_name: the worker we want to run this metadata request through, when none is provided the default worker KafkaEx.Server is used
  • key: is used for partition assignment, can be nil, when none is provided it is defaulted to nil
  • require_acks: indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas), default is 0
  • timeout: provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in RequiredAcks, default is 100 milliseconds

Example

iex> KafkaEx.produce("bar", 0, "hey")
:ok
iex> KafkaEx.produce("foo", 0, "hey", [worker_name: :pr, require_acks: 1])
[%KafkaEx.Protocol.Produce.Response{partitions: [%{error_code: 0, offset: 75, partition: 0}], topic: "foo"}]
start(type, args)

Callback implementation of :application.start/2.

stop_streaming(opts \\ [])

Specs:

stream(topic, partition, opts \\ [])

Specs:

Returns a stream that consumes fetched messages. This puts the specified worker in streaming mode and blocks the worker indefinitely. The handler is a normal GenEvent handler so you can supply a custom handler, otherwise a default handler is used.

This function should be used with care as the queue is unbounded and can cause OOM.

Optional arguments(KeywordList)

  • worker_name: the worker we want to run this metadata request through, when none is provided the default worker KafkaEx.Server is used
  • offset: offset to begin this fetch from, when none is provided 0 is assumed
  • handler: the handler we want to handle the streaming events, when none is provided the default KafkaExHandler is used
  • auto_commit: specifies if the last offset should be commited or not. Default is true

Example

iex> KafkaEx.create_worker(:stream, [{"localhost", 9092}])
{:ok, #PID<0.196.0>}
iex> KafkaEx.produce("foo", 0, "hey", worker_name: :stream)
:ok
iex> KafkaEx.produce("foo", 0, "hi", worker_name: :stream)
:ok
iex> KafkaEx.stream("foo", 0) |> Enum.take(2)
[%{attributes: 0, crc: 4264455069, key: nil, offset: 0, value: "hey"},
 %{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}]