View Source KafkaEx.Config (kafka_ex v1.0.0-rc.1)

Configuration module for KafkaEx.

Configuration Options

Add to your config/config.exs:

config :kafka_ex,
  # Required: List of broker addresses
  brokers: [{"localhost", 9092}],

  # Connection settings
  use_ssl: false,
  ssl_options: [],
  # sasl: %{mechanism: :plain, username: "user", password: "pass"},

  # Client settings
  client_id: "kafka_ex",
  sync_timeout: 3000,

  # Consumer group (used for offset storage)
  default_consumer_group: "kafka_ex",

  # Partitioner for produce requests (when partition is nil)
  partitioner: KafkaEx.Producer.Partitioner.Default,

  # Application settings
  disable_default_worker: false,
  max_restarts: 10,
  max_seconds: 60,

  # GenConsumer settings
  commit_interval: 5_000,
  commit_threshold: 100,
  auto_offset_reset: :none

Broker Configuration

Brokers can be configured in several formats:

# List of tuples
brokers: [{"host1", 9092}, {"host2", 9092}]

# CSV string
brokers: "host1:9092,host2:9092"

# Dynamic (MFA or function)
brokers: {MyModule, :get_brokers, []}
brokers: fn -> [...] end

SASL Authentication

config :kafka_ex,
  use_ssl: true,  # Required for SASL
  sasl: %{
    mechanism: :scram,  # :plain or :scram
    username: System.get_env("KAFKA_USER"),
    password: System.get_env("KAFKA_PASS"),
    mechanism_opts: %{algorithm: :sha256}  # For SCRAM
  }

Summary

Functions

Returns the authentication configuration.

Returns the configured broker list.

Returns the configured client ID.

consumer_group() deprecated

Returns the default consumer group name.

Returns the default consumer group name.

Returns the default worker name.

Returns true if default worker should not be started on application boot.

Returns the configured partitioner module.

Returns the server implementation module.

Returns SSL options for connections.

Returns true if SSL is enabled.

Functions

@spec auth_config() :: KafkaEx.Auth.Config.t() | nil

Returns the authentication configuration.

@spec brokers() :: [{String.t(), pos_integer()}] | nil

Returns the configured broker list.

Supports multiple formats:

  • List of tuples: [{"host", port}]
  • CSV string: "host:port,host:port"
  • MFA tuple: {module, function, args}
  • Zero-arity function
@spec client_id() :: String.t()

Returns the configured client ID.

This function is deprecated. Use default_consumer_group/0 instead.
@spec consumer_group() :: String.t() | :no_consumer_group

Returns the default consumer group name.

Deprecated: Use default_consumer_group/0 instead.

Link to this function

default_consumer_group()

View Source
@spec default_consumer_group() :: String.t() | :no_consumer_group

Returns the default consumer group name.

Checks :default_consumer_group first, falls back to :consumer_group for backward compatibility.

@spec default_worker() :: atom()

Returns the default worker name.

Link to this function

disable_default_worker()

View Source
@spec disable_default_worker() :: boolean()

Returns true if default worker should not be started on application boot.

@spec partitioner() :: module()

Returns the configured partitioner module.

The partitioner is used when producing messages without specifying a partition. Defaults to KafkaEx.Producer.Partitioner.Default.

@spec server_impl() :: module()

Returns the server implementation module.

Always returns KafkaEx.Client in v1.0+.

@spec ssl_options() :: Keyword.t()

Returns SSL options for connections.

Validates that ssl_options is a keyword list when SSL is enabled.

@spec use_ssl() :: boolean()

Returns true if SSL is enabled.