View Source KafkaEx (kafka_ex v1.0.0-rc.1)
KafkaEx Application and Worker Management
This module handles application lifecycle and worker management for KafkaEx.
For Kafka operations (produce, fetch, etc.), use KafkaEx.API.
Usage
Start a client for Kafka operations:
{:ok, client} = KafkaEx.API.start_client()
{:ok, offset} = KafkaEx.API.produce_one(client, "topic", 0, "message")Or use the KafkaEx.API behaviour in your own module:
defmodule MyApp.Kafka do
use KafkaEx.API, client: MyApp.KafkaClient
end
MyApp.Kafka.produce_one("topic", 0, "message")Worker-based API (Legacy)
You can also create named workers under the KafkaEx supervisor:
{:ok, pid} = KafkaEx.create_worker(:my_worker)
{:ok, offset} = KafkaEx.API.produce_one(:my_worker, "topic", 0, "message")
Summary
Functions
Builds worker options by merging with application config defaults.
Returns the consumer group name for the given worker.
Creates a KafkaEx worker under the application supervisor.
Starts and links a worker outside of a supervision tree.
Stops a worker created with create_worker/2.
Returns true if the input is a valid consumer group or :no_consumer_group.
Types
@type worker_init() :: [worker_setting()]
@type worker_setting() :: {:uris, uri()} | {:consumer_group, binary() | :no_consumer_group} | {:metadata_update_interval, non_neg_integer()} | {:consumer_group_update_interval, non_neg_integer()} | {:ssl_options, ssl_options()} | {:auth, KafkaEx.Auth.Config.t() | nil} | {:initial_topics, [binary()]}
Functions
@spec build_worker_options(worker_init()) :: {:ok, worker_init()} | {:error, :invalid_consumer_group}
Builds worker options by merging with application config defaults.
Returns {:error, :invalid_consumer_group} if consumer group is invalid.
Returns the consumer group name for the given worker.
Worker may be an atom or pid. Uses the default worker if not specified.
@spec create_worker(atom(), worker_init()) :: Supervisor.on_start_child()
Creates a KafkaEx worker under the application supervisor.
Options
consumer_group: Name of the consumer group,:no_consumer_groupfor noneuris: List of brokers as{"host", port}tuplesmetadata_update_interval: Metadata refresh interval in ms (default: 30000)consumer_group_update_interval: Consumer group refresh interval in ms (default: 30000)use_ssl: Enable SSL connections (default: false)ssl_options: SSL options (see Erlang ssl docs)auth: SASL authentication config (seeKafkaEx.Auth.Config)
Example
{:ok, pid} = KafkaEx.create_worker(:my_worker)
{:ok, pid} = KafkaEx.create_worker(:my_worker, uris: [{"localhost", 9092}])
@spec start_link_worker(atom(), [worker_setting() | {:server_impl, module()}]) :: GenServer.on_start()
Starts and links a worker outside of a supervision tree.
Takes the same arguments as create_worker/2 plus:
server_impl- The GenServer module for the client (default:KafkaEx.Client)
Stops a worker created with create_worker/2.
Returns :ok on success or {:error, reason} on failure.
Returns true if the input is a valid consumer group or :no_consumer_group.