Pure-Elixir Kafka transport for phoenix_micro.
Implements the Kafka binary wire protocol directly over TCP using
Erlang's :gen_tcp. Zero external dependencies — no kafka_ex,
no :brod, no crc32cer, no C compiler, no rebar3.
Works on every platform: Linux, macOS, Windows.
Supported Kafka wire APIs
| API | Key | Purpose |
|---|---|---|
| Produce | 0 | Publish messages |
| Fetch | 1 | Consume messages |
| ListOffsets | 2 | Get earliest/latest offsets |
| Metadata | 3 | Topic/partition discovery |
| OffsetCommit | 8 | Commit consumer offsets |
| OffsetFetch | 9 | Fetch committed offsets |
| FindCoordinator | 10 | Consumer group coordinator |
| JoinGroup | 11 | Join consumer group |
| Heartbeat | 12 | Keep group membership alive |
| LeaveGroup | 13 | Clean group exit |
| SyncGroup | 14 | Receive partition assignment |
Configuration
config :phoenix_micro,
transport: :kafka,
transports: [
kafka: [
brokers: [{"localhost", 9092}],
# OR: url: "kafka://broker1:9092,broker2:9092",
group_id: "my_app",
client_id: "phoenix_micro",
begin_offset: :latest, # :latest | :earliest | integer
acks: 1, # 0=none 1=leader -1=all
ack_timeout_ms: 5_000,
max_bytes: 1_048_576,
fetch_wait_ms: 500,
heartbeat_ms: 3_000,
session_timeout_ms: 30_000
]
]Docker Compose
services:
kafka:
image: bitnami/kafka:3.7
ports:
- "9092:9092"
environment:
KAFKA_CFG_NODE_ID: "0"
KAFKA_CFG_PROCESS_ROLES: "broker,controller"
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka:9093"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
Summary
Functions
Returns a specification to start this module under a supervisor.
Types
@type t() :: %PhoenixMicro.Transport.Kafka{ ack_timeout_ms: term(), acks: term(), begin_offset: term(), brokers: term(), client_id: term(), config: term(), connected: term(), fetch_wait_ms: term(), group_id: term(), heartbeat_ms: term(), max_bytes: term(), reconnect_attempts: term(), session_timeout_ms: term(), subscriptions: term(), task_sup: term() }
Functions
@spec child_spec(keyword()) :: Supervisor.child_spec()
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec start_link(keyword()) :: GenServer.on_start()