View Source Kayrock-Based KafkaEx Client

This document contains information on using the new Kayrock-based KafkaEx client.

The client itself is defined in the module KafkaEx.New.Client. This will be the primary KafkaEx client module going forward.

NOTE In many places below we recommend using "version N and up". The reality is that in many cases, the KafkaEx legacy API compatibility only supports versions 0 to 3. Using version 3 should generally be safe and achieve the desired outcomes. The new API will be designed to handle newer versions.

Contents:

using-the-new-client

Using the New Client

To use the new client in your project, set kafka_version: "kayrock" in your config file:

config :kafka_ex,
  kafka_version: "kayrock"

If want to start a single client instance, supply the new client's module name to the :server_impl option of KafkaEx.start_link_worker/2:

{:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client)

The new client should be totally backwards compatible with the legacy API; simply use KafkaEx like you normally would. It is also compatible with the new API defined in KafkaEx.New.KafkaExAPI.

common-use-case-store-offsets-in-kafka

Common Use Case - Store Offsets In Kafka

Offsets are stored in Kafka (instead of Zookeeper) with offset commit message version 1 and up (Kafka v0.10 and up). Message version 1 also includes a timestamp parameter that was dropped in version 2, so we recommend using at least version 2. We often use version 3 out of convenience because that version number achieves desired results with this and other Kafka API messages.

To retrieve offsets committed with version 1 and up, you must also use version 1 and up of the offset fetch request. A safe move here is to always use the same version for offset commit and offset fetch.

warning-offset-loss

WARNING - OFFSET LOSS

If offsets for a consumer group are stored in Zookeeper (v0 KafkaEx legacy), they are unavailable using v1 and up of the offset fetch message. This means that if you have an existing KafkaEx consumer group and "upgrade" to the new version, you will lose offsets. To avoid losing offsets, you should first convert the Zookeeper offsets to Kafka storage. This can be achieved using command line tools (documentation of which is beyond the scope of this document) or using KafkaEx by fetching the offsets using v0 and then committing them using v1. We may provide a tool for this in the future.

Likewise, once you store offsets in Kafka, they cannot be fetched using v0 of the offset fetch message. If you need to "roll back" storage from Kafka to Zookeeper, you will need to convert the offsets first.

examples

Examples

KafkaEx.offset_commit/2 and KafkaEx.offset_fetch/2 support setting the api version via the api_version field of their corresponding requests structs:

alias KafkaEx.Protocol.OffsetCommit

# commit offsets using kafka storage
KafkaEx.offset_commit(client, %OffsetCommit.Request{
        consumer_group: consumer_group,
        topic: topic,
        partition: 0,
        offset: offset,
        api_version: 3
      })

# fetch an offset stored in kafka
[resp] = KafkaEx.offset_fetch(client, %OffsetFetch.Request{
        topic: topic,
        consumer_group: consumer_group,
        partition: 0,
        api_version: 3
      })
%KafkaEx.Protocol.OffsetFetch.Response{partitions: [%{offset: offset}]} = resp

When using KafkaEx.fetch/3 with auto_commit: true, you can specify the offset_commit_api_version option to control how offsets are stored:

# store auto-committed offsets in kafka
KafkaEx.fetch(topic, partition, auto_commit: true, offset_commit_api_version: 3)

When using KafkaEx.ConsumerGroup, you can control offset storage using the api_versions option:

# use kafka offset storage with consumer groups
# NOTE you must use compatible version for offset_fetch and offset_commit 
#   using the same value for both should be safe
KafkaEx.ConsumerGroup.start_link(
    MyConsumer,
    consumer_group_name,
    [topic],
    api_versions: %{offset_fetch: 3, offset_commit: 3}
)

common-use-case-message-timestamps-new-storage-format

Common Use Case - Message Timestamps / New Storage Format

Message timestamps and the new message storage format go hand-in-hand because they both require setting the message versions for produce and fetch.

Timestamps were added in v1 of the produce/fetch messages, but the storage format was replaced in v2 (around Kafka v0.10), so we recommend using 2 and up (3 is safe).

warning-broker-performance

WARNING - Broker Performance

Check with your system administrator. If the broker is configured to use the new message format (v2 and up), producing or requesting messages with old formats (v0 and v1) can lead to significant load on the brokers because they will need to convert messages between versions for each request. If you have a relatively modern version of Kafka, we recommend using version 3 for both messages.

examples-1

Examples

Whenever we use produce API version >= 2, the new message format is used automatically.

To publish a message with a timestamp:

{:ok, offset} =
  KafkaEx.produce(
    topic,
    0,
    msg,
    worker_name: client,
    required_acks: 1,
    timestamp: 12345,
    api_version: 3
  )

fetch_responses =
  KafkaEx.fetch(topic, 0,
    offset: offset,
    auto_commit: false,
    worker_name: client,
    api_version: 3
  )

[fetch_response | _] = fetch_responses
[partition_response | _] = fetch_response.partitions
message = List.last(partition_response.message_set)

message.value # => msg
message.offset # => offste
message.timestamp # => 12345

If a topic has the message.timestamp.type setting set to LogAppendTime, then timestamps will be populated automatically when a produced message is received by the broker and appended to the log.

fetch_responses =
  KafkaEx.fetch(topic_with_log_append_timestamps, 0,
    offset: offset,
    auto_commit: false,
    worker_name: client,
    api_version: 3
  )

[fetch_response | _] = fetch_responses
[partition_response | _] = fetch_response.partitions
message = List.last(partition_response.message_set)

message.timestamp # => log append timestamp in milliseconds

Note that the KafkaEx.ConsumerGroup api_versions option also supports setting a version for fetch:

# use new record batch format AND kafka offset storage with consumer groups
KafkaEx.ConsumerGroup.start_link(
    MyConsumer,
    consumer_group_name,
    [topic],
    api_versions: %{offset_fetch: 3, offset_commit: 3, fetch: 3}
)