View Source KafkaEx.Config (kafka_ex v1.0.0-rc.1)
Configuration module for KafkaEx.
Configuration Options
Add to your config/config.exs:
config :kafka_ex,
# Required: List of broker addresses
brokers: [{"localhost", 9092}],
# Connection settings
use_ssl: false,
ssl_options: [],
# sasl: %{mechanism: :plain, username: "user", password: "pass"},
# Client settings
client_id: "kafka_ex",
sync_timeout: 3000,
# Consumer group (used for offset storage)
default_consumer_group: "kafka_ex",
# Partitioner for produce requests (when partition is nil)
partitioner: KafkaEx.Producer.Partitioner.Default,
# Application settings
disable_default_worker: false,
max_restarts: 10,
max_seconds: 60,
# GenConsumer settings
commit_interval: 5_000,
commit_threshold: 100,
auto_offset_reset: :noneBroker Configuration
Brokers can be configured in several formats:
# List of tuples
brokers: [{"host1", 9092}, {"host2", 9092}]
# CSV string
brokers: "host1:9092,host2:9092"
# Dynamic (MFA or function)
brokers: {MyModule, :get_brokers, []}
brokers: fn -> [...] endSASL Authentication
config :kafka_ex,
use_ssl: true, # Required for SASL
sasl: %{
mechanism: :scram, # :plain or :scram
username: System.get_env("KAFKA_USER"),
password: System.get_env("KAFKA_PASS"),
mechanism_opts: %{algorithm: :sha256} # For SCRAM
}
Summary
Functions
Returns the authentication configuration.
Returns the configured broker list.
Returns the configured client ID.
Returns the default consumer group name.
Returns the default consumer group name.
Returns the default worker name.
Returns true if default worker should not be started on application boot.
Returns the configured partitioner module.
Returns the server implementation module.
Returns SSL options for connections.
Returns true if SSL is enabled.
Functions
@spec auth_config() :: KafkaEx.Auth.Config.t() | nil
Returns the authentication configuration.
@spec brokers() :: [{String.t(), pos_integer()}] | nil
Returns the configured broker list.
Supports multiple formats:
- List of tuples:
[{"host", port}] - CSV string:
"host:port,host:port" - MFA tuple:
{module, function, args} - Zero-arity function
@spec client_id() :: String.t()
Returns the configured client ID.
@spec consumer_group() :: String.t() | :no_consumer_group
Returns the default consumer group name.
Deprecated: Use default_consumer_group/0 instead.
@spec default_consumer_group() :: String.t() | :no_consumer_group
Returns the default consumer group name.
Checks :default_consumer_group first, falls back to :consumer_group for
backward compatibility.
@spec default_worker() :: atom()
Returns the default worker name.
@spec disable_default_worker() :: boolean()
Returns true if default worker should not be started on application boot.
@spec partitioner() :: module()
Returns the configured partitioner module.
The partitioner is used when producing messages without specifying a partition.
Defaults to KafkaEx.Producer.Partitioner.Default.
@spec server_impl() :: module()
Returns the server implementation module.
Always returns KafkaEx.Client in v1.0+.
@spec ssl_options() :: Keyword.t()
Returns SSL options for connections.
Validates that ssl_options is a keyword list when SSL is enabled.
@spec use_ssl() :: boolean()
Returns true if SSL is enabled.