View Source KafkaEx (kafka_ex v1.0.0-rc.1)

KafkaEx Application and Worker Management

This module handles application lifecycle and worker management for KafkaEx. For Kafka operations (produce, fetch, etc.), use KafkaEx.API.

Usage

Start a client for Kafka operations:

{:ok, client} = KafkaEx.API.start_client()
{:ok, offset} = KafkaEx.API.produce_one(client, "topic", 0, "message")

Or use the KafkaEx.API behaviour in your own module:

defmodule MyApp.Kafka do
  use KafkaEx.API, client: MyApp.KafkaClient
end

MyApp.Kafka.produce_one("topic", 0, "message")

Worker-based API (Legacy)

You can also create named workers under the KafkaEx supervisor:

{:ok, pid} = KafkaEx.create_worker(:my_worker)
{:ok, offset} = KafkaEx.API.produce_one(:my_worker, "topic", 0, "message")

Summary

Functions

Builds worker options by merging with application config defaults.

Returns the consumer group name for the given worker.

Creates a KafkaEx worker under the application supervisor.

Starts and links a worker outside of a supervision tree.

Stops a worker created with create_worker/2.

Returns true if the input is a valid consumer group or :no_consumer_group.

Types

@type ssl_options() :: [
  cacertfile: binary(),
  certfile: binary(),
  keyfile: binary(),
  password: binary()
]
@type uri() :: [{binary() | [char()], number()}]
@type worker_init() :: [worker_setting()]
@type worker_setting() ::
  {:uris, uri()}
  | {:consumer_group, binary() | :no_consumer_group}
  | {:metadata_update_interval, non_neg_integer()}
  | {:consumer_group_update_interval, non_neg_integer()}
  | {:ssl_options, ssl_options()}
  | {:auth, KafkaEx.Auth.Config.t() | nil}
  | {:initial_topics, [binary()]}

Functions

Link to this function

build_worker_options(worker_init)

View Source
@spec build_worker_options(worker_init()) ::
  {:ok, worker_init()} | {:error, :invalid_consumer_group}

Builds worker options by merging with application config defaults.

Returns {:error, :invalid_consumer_group} if consumer group is invalid.

Link to this function

consumer_group(worker \\ Config.default_worker())

View Source
@spec consumer_group(atom() | pid()) :: binary() | :no_consumer_group

Returns the consumer group name for the given worker.

Worker may be an atom or pid. Uses the default worker if not specified.

Link to this function

create_worker(name, worker_init \\ [])

View Source
@spec create_worker(atom(), worker_init()) :: Supervisor.on_start_child()

Creates a KafkaEx worker under the application supervisor.

Options

  • consumer_group: Name of the consumer group, :no_consumer_group for none
  • uris: List of brokers as {"host", port} tuples
  • metadata_update_interval: Metadata refresh interval in ms (default: 30000)
  • consumer_group_update_interval: Consumer group refresh interval in ms (default: 30000)
  • use_ssl: Enable SSL connections (default: false)
  • ssl_options: SSL options (see Erlang ssl docs)
  • auth: SASL authentication config (see KafkaEx.Auth.Config)

Example

{:ok, pid} = KafkaEx.create_worker(:my_worker)
{:ok, pid} = KafkaEx.create_worker(:my_worker, uris: [{"localhost", 9092}])
@spec stop_worker(atom() | pid()) ::
  :ok | {:error, :not_found} | {:error, :simple_one_for_one}

Stops a worker created with create_worker/2.

Returns :ok on success or {:error, reason} on failure.

Link to this function

valid_consumer_group?(b)

View Source
@spec valid_consumer_group?(any()) :: boolean()

Returns true if the input is a valid consumer group or :no_consumer_group.