View Source KafkaEx.Config (kafka_ex v1.0.0)
Configuration module for KafkaEx.
Configuration Options
Add to your config/config.exs:
config :kafka_ex,
# Required: List of broker addresses
brokers: [{"localhost", 9092}],
# Connection settings
use_ssl: false,
ssl_options: [],
# sasl: %{mechanism: :plain, username: "user", password: "pass"},
# Client settings
client_id: "kafka_ex",
sync_timeout: 3000,
# Consumer group (used for offset storage)
default_consumer_group: "kafka_ex",
# Partitioner for produce requests (when partition is nil)
partitioner: KafkaEx.Producer.Partitioner.Default,
# Application settings
disable_default_worker: false,
max_restarts: 10,
max_seconds: 60,
# GenConsumer settings
commit_interval: 5_000,
commit_threshold: 100,
auto_offset_reset: :noneBroker Configuration
Brokers can be configured in several formats:
# List of tuples
brokers: [{"host1", 9092}, {"host2", 9092}]
# CSV string
brokers: "host1:9092,host2:9092"
# Dynamic (MFA or function)
brokers: {MyModule, :get_brokers, []}
brokers: fn -> [...] endSASL Authentication
config :kafka_ex,
use_ssl: true, # Required for SASL
sasl: %{
mechanism: :scram, # :plain or :scram
username: System.get_env("KAFKA_USER"),
password: System.get_env("KAFKA_PASS"),
mechanism_opts: %{algorithm: :sha256} # For SCRAM
}Advanced tuning
Most users never need these. They are surfaced here (and in
config/config.exs) so you can see the real defaults without
having to grep the source.
:sleep_for_reconnect(ms, default400) — delay before retrying a broker reconnect. Applied insideKafkaEx.Clientafter the socket has died but before the nextcreate_socket/5attempt. Lower values reconnect faster at the cost of hammering a broker that's down; higher values smooth out flapping brokers at the cost of longer error windows.:metadata_update_interval(ms, default30_000) — periodic metadata refresh cadence. The client does a fullMetadatarequest this often to pick up cluster changes (leader elections, new topics, broker additions/removals). Lower values recover faster from cluster changes; higher values reduce request volume.:max_restarts(default10) — top-level application supervisor restart intensity. If more than:max_restartschildren exit in any:max_secondswindow, the supervisor shuts down.:max_seconds(default60) — the restart-intensity window that:max_restartsapplies to.
Example with non-default values:
config :kafka_ex,
sleep_for_reconnect: 1_000, # slower reconnect, less broker hammering
metadata_update_interval: 10_000, # 3x more frequent metadata refresh
max_restarts: 3, # tighter restart budget
max_seconds: 30
Summary
Functions
Returns the authentication configuration.
Returns the configured broker list.
Returns the configured client ID.
Returns the default consumer group name.
Returns the default consumer group name.
Returns the default worker name.
Returns true if default worker should not be started on application boot.
Returns the configured partitioner module.
Returns the server implementation module.
Returns SSL options for connections.
Returns true if SSL is enabled.
Functions
@spec auth_config() :: KafkaEx.Auth.Config.t() | nil
Returns the authentication configuration.
@spec brokers() :: [{String.t(), pos_integer()}] | nil
Returns the configured broker list.
Supports multiple formats:
- List of tuples:
[{"host", port}] - CSV string:
"host:port,host:port" - MFA tuple:
{module, function, args} - Zero-arity function
@spec client_id() :: String.t()
Returns the configured client ID.
@spec consumer_group() :: String.t() | :no_consumer_group
Returns the default consumer group name.
Deprecated: Use default_consumer_group/0 instead.
@spec default_consumer_group() :: String.t() | :no_consumer_group
Returns the default consumer group name.
Checks :default_consumer_group first, falls back to :consumer_group for
backward compatibility.
@spec default_worker() :: atom()
Returns the default worker name.
@spec disable_default_worker() :: boolean()
Returns true if default worker should not be started on application boot.
@spec partitioner() :: module()
Returns the configured partitioner module.
The partitioner is used when producing messages without specifying a partition.
Defaults to KafkaEx.Producer.Partitioner.Default.
@spec server_impl() :: module()
Returns the server implementation module.
Always returns KafkaEx.Client in v1.0+.
@spec ssl_options() :: Keyword.t()
Returns SSL options for connections.
Validates that ssl_options is a keyword list when SSL is enabled.
@spec use_ssl() :: boolean()
Returns true if SSL is enabled.