View Source Consumer Example

Info

There is also a more complete example here.

Ensure :brod is added to your deps on mix.exs

defp deps do
    [
      {:brod, "~> 3.10.0"}
    ]
end

Both examples require a brod client with name :kafka_client to be already started. You can do that either statically by specifying it in the configuration (see an example) or dynamically (e.g. by calling :brod.start_client([{"localhost", 9092}], :kafka_client)).

Group Subscriber

Either the brod_group_subscriber_v2 or brod_group_subscriber behaviours can be used to consume messages. The key difference is that the v2 subscriber runs a worker for each partition in a separate Erlang process, allowing parallel message processing.

Here is an example of callback module that implements the brod_group_subscriber_v2 behaviour to consume messages.

defmodule BrodSample.GroupSubscriberV2 do
  @behaviour :brod_group_subscriber_v2

  def child_spec(_arg) do
    config = %{
      client: :kafka_client,
      group_id: "consumer_group_name",
      topics: ["streaming.events"],
      cb_module: __MODULE__,
      consumer_config: [{:begin_offset, :earliest}],
      init_data: [],
      message_type: :message_set,
      group_config: [
        offset_commit_policy: :commit_to_kafka_v2,
        offset_commit_interval_seconds: 5,
        rejoin_delay_seconds: 60,
        reconnect_cool_down_seconds: 60
      ]
    }

    %{
      id: __MODULE__,
      start: {:brod_group_subscriber_v2, :start_link, [config]},
      type: :worker,
      restart: :temporary,
      shutdown: 5000
    }
  end

  @impl :brod_group_subscriber_v2
  def init(_group_id, _init_data), do: {:ok, []}

  @impl :brod_group_subscriber_v2
  def handle_message(message, _state) do
    IO.inspect(message, label: "message")
    {:ok, :commit, []}
  end
end

The example module implements child_spec/1 so that our consumer can be started by a Supervisor. The restart policy is set to :temporary because, in this case, if a message can not be processed, then there is no point in restarting. This might not always be the case.

See :brod_group_subscriber_v2.start_link/1 for details on the configuration options.

See docs for more details about the required or optional callbacks.

Partition Subscriber

A more low-level approach can be used when you want a more fine-grained control or when you have only a single partition.

defmodule BrodSample.PartitionSubscriber do
  use GenServer

  import Record, only: [defrecord: 2, extract: 2]

  defrecord :kafka_message, extract(:kafka_message, from_lib: "brod/include/brod.hrl")
  defrecord :kafka_message_set, extract(:kafka_message_set, from_lib: "brod/include/brod.hrl")
  defrecord :kafka_fetch_error, extract(:kafka_fetch_error, from_lib: "brod/include/brod.hrl")

  defmodule State do
    @enforce_keys [:consumer_pid]
    defstruct consumer_pid: nil
  end

  defmodule KafkaMessage do
    @enforce_keys [:offset, :key, :value, :ts]
    defstruct offset: nil, key: nil, value: nil, ts: nil
  end

  def start_link(topic, partition) do
    GenServer.start_link(__MODULE__, {topic, partition})
  end

  @impl true
  def init({topic, partition}) do
    # start the consumer(s)
    # if you have more than one partition, do it somewhere else once for all partitions
    # (e.g. in the parent process)
    :ok = :brod.start_consumer(:kafka_client, topic, begin_offset: :latest)

    {:ok, consumer_pid} = :brod.subscribe(:kafka_client, self(), topic, partition, [])
    # you may also want to handle error when subscribing
    # and to monitor the consumer pid (and resubscribe when the consumer crashes)

    {:ok, %State{consumer_pid: consumer_pid}}
  end

  @impl true
  def handle_info(
        {consumer_pid, kafka_message_set(messages: msgs)},
        %State{consumer_pid: consumer_pid} = state
      ) do
    for msg <- msgs do
      msg = kafka_message_to_struct(msg)

      # process the message...
      IO.inspect(msg)

      # and then acknowledge it
      :brod.consume_ack(consumer_pid, msg.offset)
    end

    {:noreply, state}
  end

  def handle_info({pid, kafka_fetch_error()} = error, %State{consumer_pid: pid} = state) do
    # you may want to handle the error differently
    {:stop, error, state}
  end

  defp kafka_message_to_struct(kafka_message(offset: offset, key: key, value: value, ts: ts)) do
    %KafkaMessage{
      offset: offset,
      key: key,
      value: value,
      ts: DateTime.from_unix!(ts, :millisecond)
    }
  end
end