KafkaEx

Summary

create_worker(name)

create_worker creates KafkaEx workers with broker list supplied in config

create_worker(uris, name)

create_worker creates KafkaEx workers

earliest_offset(topic, partition, name \\ KafkaEx.Server)

Get the offset of the earliest message still persistent in Kafka

fetch(topic, partition, offset, name \\ KafkaEx.Server, wait_time \\ 10, min_bytes \\ 1, max_bytes \\ 1000000)

Fetch a set of messages from Kafka from the given topic, partition ID, and offset

latest_offset(topic, partition, name \\ KafkaEx.Server)

Get the offset of the latest message written to Kafka

metadata(topic \\ "", name \\ KafkaEx.Server)

Return metadata for the given topic; return for all topics if topic is empty string

offset(topic, partition, time, name \\ KafkaEx.Server)

Get the offset of the message sent at the specified date/time

produce(topic, partition, value, name \\ KafkaEx.Server, key \\ nil, required_acks \\ 0, timeout \\ 100)

Produces messages to kafka logs

start(type, args)

Callback implementation of :application.start/2

stream(topic, partition, name \\ KafkaEx.Server, offset \\ 0, handler \\ KafkaExHandler)

Returns a stream that consumes fetched messages. This puts the specified worker in streaming mode and blocks the worker indefinitely. The handler is a normal GenEvent handler so you can supply a custom handler, otherwise a default handler is used

Types

datetime :: {{pos_integer, pos_integer, pos_integer}, {pos_integer, pos_integer, pos_integer}}

uri :: [{binary | char_list, number}]

Functions

create_worker(name)

Specs:

create_worker creates KafkaEx workers with broker list supplied in config

Example

iex> KafkaEx.create_worker(:pr)
{:ok, #PID<0.171.0>}
create_worker(uris, name)

Specs:

create_worker creates KafkaEx workers

Example

iex> KafkaEx.create_worker(:pr)
iex> KafkaEx.create_worker([{"localhost", 9092}], :pr)
{:ok, #PID<0.171.0>}
earliest_offset(topic, partition, name \\ KafkaEx.Server)

Get the offset of the earliest message still persistent in Kafka

Example

iex> KafkaEx.latest_offset("foo", 0, :mt)
{:ok, %{"foo" => %{0 => %{error_code: 0, offsets: [0]}}}}
fetch(topic, partition, offset, name \\ KafkaEx.Server, wait_time \\ 10, min_bytes \\ 1, max_bytes \\ 1000000)

Specs:

  • fetch(binary, number, number, atom, number, number, number) :: any

Fetch a set of messages from Kafka from the given topic, partition ID, and offset

Example

iex> KafkaEx.fetch("food", 0, 0)
{:ok,
 %{"food" => %{0 => %{error_code: 0, hw_mark_offset: 133,
       message_set: [%{attributes: 0, crc: 4264455069, key: nil, offset: 0,
          value: "hey"},
        %{attributes: 0, crc: 4264455069, key: nil, offset: 1, value: "hey"},
...]}}}}
latest_offset(topic, partition, name \\ KafkaEx.Server)

Get the offset of the latest message written to Kafka

Example

iex> KafkaEx.latest_offset("foo", 0, :mt)
{:ok, %{"foo" => %{0 => %{error_code: 0, offsets: [16]}}}}
metadata(topic \\ "", name \\ KafkaEx.Server)

Return metadata for the given topic; return for all topics if topic is empty string

Example

iex> KafkaEx.create_worker(:pr)
iex> KafkaEx.metadata("foo", :mt)
%{brokers: %{1 => {"localhost", 9092}},
  topics: %{"foo" => %{error_code: 0,
      partitions: %{0 => %{error_code: 0, isrs: [1], leader: 1, replicas: [1]},
        1 => %{error_code: 0, isrs: [1], leader: 1, replicas: [1]},
        2 => %{error_code: 0, isrs: [1], leader: 1, replicas: [1]},
        3 => %{error_code: 0, isrs: [1], leader: 1, replicas: [1]},
        4 => %{error_code: 0, isrs: [1], leader: 1, replicas: [1]}}}}}
offset(topic, partition, time, name \\ KafkaEx.Server)

Get the offset of the message sent at the specified date/time

produce(topic, partition, value, name \\ KafkaEx.Server, key \\ nil, required_acks \\ 0, timeout \\ 100)

Specs:

  • produce(binary, number, binary, atom, binary, number, number) :: any

Produces messages to kafka logs

Example

iex> KafkaEx.produce("food", 0, 0)
:ok
iex> KafkaEx.produce("foo", 0, "hey", :pr, nil, 1)
{:ok, %{"foo" => %{0 => %{error_code: 0, offset: 15}}}}
start(type, args)

Callback implementation of :application.start/2.

stream(topic, partition, name \\ KafkaEx.Server, offset \\ 0, handler \\ KafkaExHandler)

Specs:

Returns a stream that consumes fetched messages. This puts the specified worker in streaming mode and blocks the worker indefinitely. The handler is a normal GenEvent handler so you can supply a custom handler, otherwise a default handler is used.

This function should be used with care as the queue is unbounded and can cause OOM.

Example

iex> KafkaEx.create_worker([{"localhost", 9092}], :stream)
{:ok, #PID<0.196.0>}
iex> KafkaEx.produce("foo", 0, "hey", :stream)
:ok
iex> KafkaEx.produce("foo", 0, "hi", :stream)
:ok
iex> KafkaEx.stream("foo", 0) |> iex> Enum.take(2)
[%{attributes: 0, crc: 4264455069, key: nil, offset: 0, value: "hey"},
 %{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}]