View Source Publisher Example

Info

There is also a more complete example here.

Ensure :brod is added to your deps on mix.exs

defp deps do
    [
      {:brod, "~> 3.10.0"}
    ]
end

Client Configuration

To use producers, you have to start a client first.

You can do that by adding the following configuration (e.g. into config/dev.exs):

import Config

config :brod,
  clients: [
    kafka_client: [
      endpoints: [localhost: 9092],
      auto_start_producers: true,
      # The following :ssl and :sasl configs are not
      # required when running kafka locally unauthenticated
      ssl: true,
      sasl: {
        :plain,
        System.get_env("KAFKA_CLUSTER_API_KEY"),
        System.get_env("KAFKA_CLUSTER_API_SECRET")
      }
    ]
  ]

or by starting it dynamically with this snippet (you can also add SSL/SASL configuration if you want to):

:brod.start_client([localhost: 9092], :kafka_client, auto_start_producers: true)

Note: kafka_client can be any valid atom. And :endpoints accepts multiple host port tuples (e.g. endpoints: [{"192.168.0.2", 9092}, {"192.168.0.3", 9092}, ...]).

If you don't pass the auto_start_producers: true option, you also have to manually start producers before calling :brod.produce_sync/5 (and other produce functions). For example like this: :brod.start_producer(:kafka_client, "my_topic", []).

See :brod.start_client/3 for a list of all available options.

Publisher

To send a message with brod we can use the :brod.produce_sync/5 function

defmodule BrodExample.Publisher do
  def publish(topic, partition, key, message) do
    :brod.produce_sync(:kafka_client, topic, :hash, key, message)
  end
end

There are also other ways (functions) how to produce messages, you can find them in the overview and in the brod module documentation.

Using partition key

When providing :hash as the partition when calling :brod.produce_sync/5 is equivalent to the following:

{:ok, count} = :brod.get_partitions_count(:kafka_client, topic)
partition = rem(:erlang.phash2(key), count)
:brod.produce_sync(:kafka_client, topic, partition, key, message)

Internally brod will get the partition count, generate a hash for the key within the range of partitions, and publish the message to the calculated hash. This is the same sticky routing that Kafka's ProducerRecord implements:

If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion.