View Source Client configuration

Here are some client configuration examples.

Simplest configuration

  config :my_app, MyApp.Client,
    connection: [
      bootstrap_servers: ["localhost:19092", "localhost:29092"],
      ssl: false
    ]

This client will connect to brokers using non ssl connection and produce messages using the default producer and default partitioner.

SSL with SASL and custom socket opts

  config :my_app, MyApp.Client,
    connection: [
      bootstrap_servers: ["localhost:19092", "localhost:29092"],
      ssl: true,
      connect_opts: [
        verify: :verify_peer,
        cacertfile: Path.relative("test/compose_files/ssl/ca.crt")
      ],
      socket_opts: [delay_send: true],
      sasl_opts: [
        mechanism: "PLAIN",
        mechanism_opts: [
          username: "klifeusr",
          password: "klifepwd"
        ]
      ],
    ]

This client will connect to brokers using ssl connection, connect_opts and socket_opts are forwarded to erlang module :ssl in order to proper configure the socket. See the documentation for more details.

Defining and using multiple producers

  config :my_app, MyApp.Client,
    connection: [
      bootstrap_servers: ["localhost:19092", "localhost:29092"],
      ssl: false
    ],
    producers: [
      [
        name: :my_linger_ms_producer,
        linger_ms: 1_000
      ],
      [
        name: :my_custom_client_id_producer,
        client_id: "my_custom_client_id",
      ]
    ],
    topics: [
      [
        name: "my_topic_0",
        default_producer: :my_linger_ms_producer
      ],
      [
        name: "my_topic_1",
        default_producer: :my_custom_client_id_producer
      ]
    ]

This client will have a total of 3 producers, the default one plus the other 2 defined in the configuration. You can see all the configuration options for the producers in Klife.Producer.

Messages produced to my_topic_0 and my_topic_1 will use my_linger_ms_producer and my_custom_client_id_producer respectively if no producer is set on opts. All other topics keep using the default producer.

Defining and using custom partitioner

First you need to implement a module following the Klife.Behaviours.Partitioner behaviour.

defmodule MyApp.MyCustomPartitioner do
  @behaviour Klife.Behaviours.Partitioner

  alias Klife.Record

  @impl true
  def get_partition(%Record{} = record, max_partition) do
    # Some logic to find the partition here!
  end
end

Then, you need to use it on your configuration.

  config :my_app, MyApp.Client,
    connection: [
      bootstrap_servers: ["localhost:19092", "localhost:29092"],
      ssl: false
    ],
    topics: [
      [
        name: "my_topic_0",
        default_partitioner: MyApp.MyCustomPartitioner
      ]
    ]

On this client, the records produced to my_topic_0 without a specific partition will have a partition assigned using the MyApp.MyCustomPartitioner module all other topics keep using the default partitioner.

Defining multiple transactional (txn) pools

  config :my_app, MyApp.Client,
    connection: [
      bootstrap_servers: ["localhost:19092", "localhost:29092"],
      ssl: false
    ],
    default_txn_pool: :my_txn_pool,
    txn_pools: [
      [name: :my_txn_pool, base_txn_id: "my_custom_base_txn_id"],
      [name: :my_txn_pool_2, txn_timeout_ms: :timer.seconds(120)]
    ]
    topics: [[name: "my_topic_0"]]

This client will have a total of 3 txn pools, the default one plus the other two defined in the configuration. You can see all the configuration options for the producers in Klife.TxnProducerPool.

Using custom default producer, partitioner and txn pool

  config :my_app, MyApp.Client,
    connection: [
      bootstrap_servers: ["localhost:19092", "localhost:29092"],
      ssl: false
    ],
    default_producer: :my_custom_producer,
    producers: [[name: :my_custom_producer, linger_ms: 1_000]],
    default_partitioner: MyCustomPartitioner,
    default_txn_pool: :my_txn_pool,
    txn_pools: [[name: :my_txn_pool, base_txn_id: "my_custom_base_txn_id"]],
    topics: [[name: "my_topic_0"]]

This cliente will have only one producer (:my_custom_producer) and txn pool (:my_txn_pool),and the default paritioner strategy will be MyCustomPartitioner. All this 3 configurations will be used in produce API calls to topics that does not have any override config defined in the topics configuration.