View Source Upgrading to KafkaEx 1.0

Overview

KafkaEx 1.0 brings a cleaner API, removes legacy code, and uses Kayrock as the sole protocol implementation. This guide helps you migrate from KafkaEx 0.x.

Breaking Changes

Removed Legacy Servers

The following server implementations have been removed:

  • KafkaEx.Server0P8P0
  • KafkaEx.Server0P8P2
  • KafkaEx.Server0P9P0
  • KafkaEx.Server0P10AndLater

Kayrock is now the only implementation, providing automatic API version negotiation.

Configuration Changes

Removed options:

  • kafka_version - No longer needed; the client automatically negotiates versions

Update your config:

# Before (0.x)
config :kafka_ex,
  kafka_version: "kayrock",
  brokers: [{"localhost", 9092}]

# After (1.0)
config :kafka_ex,
  brokers: [{"localhost", 9092}]

Module Reorganization

Modules have been reorganized by domain:

Old ModuleNew Module
KafkaEx.GenConsumerKafkaEx.Consumer.GenConsumer
KafkaEx.ConsumerGroupKafkaEx.Consumer.ConsumerGroup
KafkaEx.New.ClientKafkaEx.Client
KafkaEx.New.KafkaExAPIKafkaEx.API
KafkaEx.New.Kafka.*KafkaEx.Messages.*

API Changes

New explicit client API:

# Before (0.x) - implicit worker
KafkaEx.produce("topic", 0, "message")
KafkaEx.fetch("topic", 0, 0)  # offset is positional

# After (1.0) - explicit client
{:ok, client} = KafkaEx.API.start_client(brokers: [{"localhost", 9092}])
{:ok, _} = KafkaEx.API.produce(client, "topic", 0, [%{value: "message"}])
{:ok, result} = KafkaEx.API.fetch(client, "topic", 0, 0)

Headers API — [%Header{}] instead of [{key, value}]

The headers: option on every produce function now takes a list of %KafkaEx.Messages.Header{} structs instead of {key, value} tuples. This is a runtime breaking change — your code will compile and only fail with FunctionClauseError on the first produce. Migrate before upgrading in production.

# Before (0.x / rc.2)
KafkaEx.API.produce(client, "t", 0, [
  %{value: "v", headers: [{"trace-id", "abc"}, {"tenant", "prod"}]}
])

# After (1.0)
alias KafkaEx.Messages.Header
KafkaEx.API.produce(client, "t", 0, [
  %{value: "v", headers: [
    Header.new("trace-id", "abc"),
    Header.new("tenant", "prod")
  ]}
])

Why: the fetch path was already returning %Header{} structs. The produce side was the asymmetric outlier; a single consistent shape across produce and fetch makes round-trip code cleaner.

Broker version requirements

  • Minimum: Kafka 0.11.0+ — required for RecordBatch format, headers, and timestamps. Earlier brokers will fail at produce.
  • Tested: Kafka 2.1.0 through 3.8.x.
  • Kafka 2.3+ recommended — needed for KIP-394 two-step JoinGroup semantics with group.initial.rebalance.delay.ms. kafka_ex auto-handles the two-step dance, but broker support is required.
  • Kafka 4.0+ — partial compatibility; tracked in #497. Consumer groups may hit protocol changes.

Optional dependency matrix

Some features require additional deps in your app's mix.exs. If you configure a feature without the backing dep, you'll get an UndefinedFunctionError at runtime (not at startup).

FeatureRequired dep
Snappy compression{:snappyer, "~> 1.2"}
Zstd compression{:ezstd, "~> 1.0"}
LZ4 compression{:lz4b, "~> 0.0.13"}
MSK-IAM SASL{:jason, "~> 1.0"}, {:aws_signature, "~> 0.4"}, {:aws_credentials, "~> 1.0"}
OAuth JWT parsinguser's choice (e.g., {:joken, "~> 2.6"} — only if your token_provider needs to parse JWTs)

0.x → 1.0 API cheat-sheet

0.x1.0
KafkaEx.produce("t", 0, "m")KafkaEx.API.produce_one(client, "t", 0, "m")
KafkaEx.fetch("t", 0, offset: 0)KafkaEx.API.fetch(client, "t", 0, 0)
KafkaEx.GenConsumerKafkaEx.Consumer.GenConsumer
KafkaEx.ConsumerGroupKafkaEx.Consumer.ConsumerGroup
config :kafka_ex, kafka_version: "kayrock"(remove — no longer needed)
headers: [{"k", "v"}] on produceheaders: [Header.new("k", "v")]

OffsetCommit error handling (new in 1.0)

In earlier kafka_ex, :illegal_generation and related errors were logged and swallowed — the consumer kept running on a stale generation until the next heartbeat happened to also fail.

v1.0 classifies OffsetCommit errors across three paths, matching the reference Kafka clients (Java, librdkafka, brod, kafka-python):

  • Terminal (:fenced_instance_id, :group_authorization_failed, :topic_authorization_failed, :offset_metadata_too_large, :invalid_commit_offset_size) — consumer stops without rejoining. Under restart: :transient the supervisor does not respawn.
  • Fatal (:illegal_generation, :unknown_member_id) — GenConsumer casts {:rejoin_required, reason, stale_gen} to the group manager and self-stops. The manager resets member_id/generation_id and runs a rebalance. Duplicate casts from sibling partitions coalesce in the manager's mailbox.
  • Retryable (:rebalance_in_progress, :unstable_offset_commit, :timeout, :coordinator_not_available, …) — commit is retried with exponential backoff.

No user callback is invoked — kafka_ex v1 does not have a synchronous handle_commit_failure/3 behaviour (deferred post-1.0). Subscribe to the new telemetry event to observe failures:

:telemetry.attach(
  "my-commit-failure-observer",
  [:kafka_ex, :consumer, :commit_failed],
  fn _event, %{count: 1}, metadata, _ ->
    # metadata: %{group_id, topic, partition, offset, kind, error}
    Logger.warning("Commit failed: #{inspect(metadata)}")
  end,
  nil
)

At-least-once semantics are preserved: any uncommitted messages since the last successful commit will be redelivered after the rejoin, so your handle_message_set/2 must be idempotent (or tolerate duplicates).

GenConsumer Changes

# Before (0.x)
defmodule MyConsumer do
  use KafkaEx.GenConsumer
  # ...
end

# After (1.0)
defmodule MyConsumer do
  use KafkaEx.Consumer.GenConsumer
  # ...
end

ConsumerGroup Changes

# Before (0.x)
KafkaEx.ConsumerGroup.start_link(
MyConsumer, "my-group", ["topic"],
  # ...
)

# After (1.0)
KafkaEx.Consumer.ConsumerGroup.start_link(
  MyConsumer, "my-group", ["topic"],
  # ...
)

Deprecations

The following functions and modules are deprecated in v1.0 and scheduled for removal in v2.0. They continue to work in the entire 1.x series — plan migration at your convenience.

DeprecatedReplacementNotes
KafkaEx.Config.consumer_group/0KafkaEx.Config.default_consumer_group/0Function-for-function swap.
KafkaEx.Client.State.max_supported_api_version/3KafkaEx.Client.State.max_supported_api_version/2Drop the default arg and match on {:ok, vsn} / {:error, :api_not_supported_by_broker}.
KafkaEx.Producer.Partitioner.LegacyKafkaEx.Producer.Partitioner.DefaultSee KafkaEx.Producer.Partitioner moduledoc.

Each of these emits an Elixir compile-time @deprecated warning — mix compile --warnings-as-errors will flag the first call site.

Migration Checklist

  • [ ] Remove kafka_version from config
  • [ ] Update KafkaEx.GenConsumer to KafkaEx.Consumer.GenConsumer (required - code will not compile)
  • [ ] Update KafkaEx.ConsumerGroup to KafkaEx.Consumer.ConsumerGroup (required - code will not compile)
  • [ ] Update code to use KafkaEx.API functions (optional but recommended)
  • [ ] Update any references to KafkaEx.New.* modules
  • [ ] If you depend on specific protocol versions, add api_versions to config (see API Version Resolution below)
  • [ ] Run tests and fix deprecation warnings
  • [ ] Verify with your Kafka cluster

Important: Old module names (KafkaEx.GenConsumer, KafkaEx.ConsumerGroup, etc.) are not aliased. Code using old module names will fail to compile immediately. All references must be updated.

New Features in 1.0

Explicit Client API

The new KafkaEx.API module provides explicit, client-based functions:

{:ok, client} = KafkaEx.API.start_client(brokers: [{"localhost", 9092}])

# Produce
{:ok, metadata} = KafkaEx.API.produce_one(client, "topic", 0, "value")

# Fetch
{:ok, result} = KafkaEx.API.fetch(client, "topic", 0, 0)

# Offsets
{:ok, offset} = KafkaEx.API.latest_offset(client, "topic", 0)
{:ok, _} = KafkaEx.API.commit_offset(client, "group", "topic", [%{partition_num: 0, offset: offset}])

# Topic management
{:ok, _} = KafkaEx.API.create_topic(client, "new-topic", num_partitions: 3)

API Version Resolution

The client now uses the highest protocol version supported by both the broker and the protocol library by default. Previous versions used conservative hardcoded defaults (e.g., fetch v3, produce v3) even when the broker supported higher versions.

If you need to pin specific API versions — for example, to match previous behavior or work around broker-specific issues — use the new api_versions application config:

config :kafka_ex,
  api_versions: %{
    fetch: 3,
    produce: 3,
    metadata: 1
  }

Version selection follows this priority order:

  1. Per-request :api_version option (highest priority)
  2. Application config api_versions map
  3. Broker-negotiated max (default)

The GenConsumer / ConsumerGroup :api_versions supervisor option continues to work for per-consumer-group overrides. Application config is no longer read by GenConsumer directly — it is handled centrally by the client's request builder.

latest_offset/4 and earliest_offset/4 no longer force list_offsets v1. They use the standard version resolution like all other API calls.

Telemetry & Observability

Built-in telemetry support for monitoring connections, requests, and consumer operations:

:telemetry.attach(
  "kafka-handler",
  [:kafka_ex, :request, :stop],
  &MyApp.handle_event/4,
  nil
)

See README.md for complete event reference and setup examples.

Compression Support

Support for multiple compression formats on a per-request basis:

# Gzip compression (built-in)
{:ok, _} = KafkaEx.API.produce(client, "topic", 0, messages, compression: :gzip)

# Supported: :gzip, :snappy, :lz4, :zstd

See README.md for details on all compression formats.

SASL Authentication

Full SASL support including PLAIN, SCRAM-SHA-256/512, OAUTHBEARER, and AWS MSK IAM:

# SCRAM example
config :kafka_ex,
  brokers: [{"localhost", 9292}],
  use_ssl: true,
  sasl: %{
    mechanism: :scram,
    username: "user",
    password: "pass",
    mechanism_opts: %{algo: :sha256}
  }

See AUTH.md for complete configuration examples for all authentication mechanisms.

Getting Help