View Source Upgrading to KafkaEx 1.0

Overview

KafkaEx 1.0 brings a cleaner API, removes legacy code, and uses Kayrock as the sole protocol implementation. This guide helps you migrate from KafkaEx 0.x.

Breaking Changes

Removed Legacy Servers

The following server implementations have been removed:

  • KafkaEx.Server0P8P0
  • KafkaEx.Server0P8P2
  • KafkaEx.Server0P9P0
  • KafkaEx.Server0P10AndLater

Kayrock is now the only implementation, providing automatic API version negotiation.

Configuration Changes

Removed options:

  • kafka_version - No longer needed; the client automatically negotiates versions

Update your config:

# Before (0.x)
config :kafka_ex,
  kafka_version: "kayrock",
  brokers: [{"localhost", 9092}]

# After (1.0)
config :kafka_ex,
  brokers: [{"localhost", 9092}]

Module Reorganization

Modules have been reorganized by domain:

Old ModuleNew Module
KafkaEx.GenConsumerKafkaEx.Consumer.GenConsumer
KafkaEx.ConsumerGroupKafkaEx.Consumer.ConsumerGroup
KafkaEx.New.ClientKafkaEx.Client
KafkaEx.New.KafkaExAPIKafkaEx.API
KafkaEx.New.Kafka.*KafkaEx.Messages.*

API Changes

New explicit client API:

# Before (0.x) - implicit worker
KafkaEx.produce("topic", 0, "message")
KafkaEx.fetch("topic", 0, 0)  # offset is positional

# After (1.0) - explicit client
{:ok, client} = KafkaEx.API.start_client(brokers: [{"localhost", 9092}])
{:ok, _} = KafkaEx.API.produce(client, "topic", 0, [%{value: "message"}])
{:ok, result} = KafkaEx.API.fetch(client, "topic", 0, 0)

GenConsumer Changes

# Before (0.x)
defmodule MyConsumer do
  use KafkaEx.GenConsumer
  # ...
end

# After (1.0)
defmodule MyConsumer do
  use KafkaEx.Consumer.GenConsumer
  # ...
end

ConsumerGroup Changes

# Before (0.x)
KafkaEx.ConsumerGroup.start_link(
MyConsumer, "my-group", ["topic"],
  # ...
)

# After (1.0)
KafkaEx.Consumer.ConsumerGroup.start_link(
  MyConsumer, "my-group", ["topic"],
  # ...
)

Migration Checklist

  • [ ] Remove kafka_version from config
  • [ ] Update KafkaEx.GenConsumer to KafkaEx.Consumer.GenConsumer (required - code will not compile)
  • [ ] Update KafkaEx.ConsumerGroup to KafkaEx.Consumer.ConsumerGroup (required - code will not compile)
  • [ ] Update code to use KafkaEx.API functions (optional but recommended)
  • [ ] Update any references to KafkaEx.New.* modules
  • [ ] Run tests and fix deprecation warnings
  • [ ] Verify with your Kafka cluster

Important: Old module names (KafkaEx.GenConsumer, KafkaEx.ConsumerGroup, etc.) are not aliased. Code using old module names will fail to compile immediately. All references must be updated.

New Features in 1.0

Explicit Client API

The new KafkaEx.API module provides explicit, client-based functions:

{:ok, client} = KafkaEx.API.start_client(brokers: [{"localhost", 9092}])

# Produce
{:ok, metadata} = KafkaEx.API.produce_one(client, "topic", 0, "value")

# Fetch
{:ok, result} = KafkaEx.API.fetch(client, "topic", 0, 0)

# Offsets
{:ok, offset} = KafkaEx.API.latest_offset(client, "topic", 0)
{:ok, _} = KafkaEx.API.commit_offset(client, "group", "topic", [%{partition_num: 0, offset: offset}])

# Topic management
{:ok, _} = KafkaEx.API.create_topic(client, "new-topic", num_partitions: 3)

Automatic API Version Negotiation

No need to configure Kafka versions - the client automatically negotiates the best API version with your brokers.

Telemetry & Observability

Built-in telemetry support for monitoring connections, requests, and consumer operations:

:telemetry.attach(
  "kafka-handler",
  [:kafka_ex, :request, :stop],
  &MyApp.handle_event/4,
  nil
)

See README.md for complete event reference and setup examples.

Compression Support

Support for multiple compression formats on a per-request basis:

# Gzip compression (built-in)
{:ok, _} = KafkaEx.API.produce(client, "topic", 0, messages, compression: :gzip)

# Supported: :gzip, :snappy, :lz4, :zstd

See README.md for details on all compression formats.

SASL Authentication

Full SASL support including PLAIN, SCRAM-SHA-256/512, OAUTHBEARER, and AWS MSK IAM:

# SCRAM example
config :kafka_ex,
  brokers: [{"localhost", 9292}],
  use_ssl: true,
  sasl: %{
    mechanism: :scram,
    username: "user",
    password: "pass",
    mechanism_opts: %{algo: :sha256}
  }

See AUTH.md for complete configuration examples for all authentication mechanisms.

Getting Help