PhoenixMicro.Transport.Kafka (PhoenixMicro v1.0.0)

Copy Markdown View Source

Pure-Elixir Kafka transport for phoenix_micro.

Implements the Kafka binary wire protocol directly over TCP using Erlang's :gen_tcp. Zero external dependencies — no kafka_ex, no :brod, no crc32cer, no C compiler, no rebar3.

Works on every platform: Linux, macOS, Windows.

Supported Kafka wire APIs

APIKeyPurpose
Produce0Publish messages
Fetch1Consume messages
ListOffsets2Get earliest/latest offsets
Metadata3Topic/partition discovery
OffsetCommit8Commit consumer offsets
OffsetFetch9Fetch committed offsets
FindCoordinator10Consumer group coordinator
JoinGroup11Join consumer group
Heartbeat12Keep group membership alive
LeaveGroup13Clean group exit
SyncGroup14Receive partition assignment

Configuration

config :phoenix_micro,
  transport: :kafka,
  transports: [
    kafka: [
      brokers: [{"localhost", 9092}],
      # OR: url: "kafka://broker1:9092,broker2:9092",
      group_id:           "my_app",
      client_id:          "phoenix_micro",
      begin_offset:       :latest,       # :latest | :earliest | integer
      acks:               1,             # 0=none 1=leader -1=all
      ack_timeout_ms:     5_000,
      max_bytes:          1_048_576,
      fetch_wait_ms:      500,
      heartbeat_ms:       3_000,
      session_timeout_ms: 30_000
    ]
  ]

Docker Compose

services:
  kafka:
    image: bitnami/kafka:3.7
    ports:
      - "9092:9092"
    environment:
      KAFKA_CFG_NODE_ID: "0"
      KAFKA_CFG_PROCESS_ROLES: "broker,controller"
      KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
      KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka:9093"
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"

Summary

Functions

Returns a specification to start this module under a supervisor.

Types

t()

@type t() :: %PhoenixMicro.Transport.Kafka{
  ack_timeout_ms: term(),
  acks: term(),
  begin_offset: term(),
  brokers: term(),
  client_id: term(),
  config: term(),
  connected: term(),
  fetch_wait_ms: term(),
  group_id: term(),
  heartbeat_ms: term(),
  max_bytes: term(),
  reconnect_attempts: term(),
  session_timeout_ms: term(),
  subscriptions: term(),
  task_sup: term()
}

Functions

child_spec(init_arg)

@spec child_spec(keyword()) :: Supervisor.child_spec()

Returns a specification to start this module under a supervisor.

See Supervisor.

start_link(config \\ [])

@spec start_link(keyword()) :: GenServer.on_start()