View Source KafkaEx.GenConsumer behaviour (kafka_ex v0.13.0)

A behaviour module for implementing a Kafka consumer.

A KafkaEx.GenConsumer is an Elixir process that consumes messages from Kafka. A single KafkaEx.GenConsumer process consumes from a single partition of a Kafka topic. Several KafkaEx.GenConsumer processes can be used to consume from multiple partitions or even multiple topics. Partition assignments for a group of KafkaEx.GenConsumers can be defined manually using KafkaEx.GenConsumer.Supervisor or coordinated across a cluster of nodes using KafkaEx.ConsumerGroup.

A KafkaEx.GenConsumer must implement three callbacks. Two of these will be defined with default behavior if you add use KafkaEx.GenConsumer to your module, leaving just handle_message_set/2 to be implemented. This is the recommended usage.

example

Example

The KafkaEx.GenConsumer behaviour abstracts common Kafka consumer interactions. KafkaEx.GenConsumer will take care of the details of determining a starting offset, fetching messages from a Kafka broker, and committing offsets for consumed messages. Developers are only required to implement handle_message_set/2 to process messages.

The following is a minimal example that logs each message as it's consumed:

defmodule ExampleGenConsumer do
  use KafkaEx.GenConsumer

  alias KafkaEx.Protocol.Fetch.Message

  require Logger

  # note - messages are delivered in batches
  def handle_message_set(message_set, state) do
    for %Message{value: message} <- message_set do
      Logger.debug(fn -> "message: " <> inspect(message) end)
    end
    {:async_commit, state}
  end
end

handle_message_set/2 will be called with the batch of messages fetched from the broker. The number of messages in a batch is determined by the number of messages available and the max_bytes and min_bytes parameters of the fetch request (which can be configured in KafkaEx). In this example, because handle_message_set/2 always returns {:async_commit, new_state}, the message offsets will be automatically committed asynchronously.

committing-offsets

Committing Offsets

KafkaEx.GenConsumer manages a consumer's offsets by committing the offsets of consumed messages. KafkaEx supports two commit strategies: asynchronous and synchronous. The return value of handle_message_set/2 determines which strategy is used:

  • {:sync_commit, new_state} causes synchronous offset commits.
  • {:async_commit, new_state} causes asynchronous offset commits.

Note that with both of the offset commit strategies, only if the final offset in the message set is committed and this is done after the messages are consumed. If you want to commit the offset of every message consumed, use the synchronous offset commit strategy and implement calls to KafkaEx.offset_commit/2 within your consumer as appropriate.

synchronous-offset-commits

Synchronous offset commits

When handle_message_set/2 returns {:sync_commit, new_state}, the offset of the final message in the message set is committed immediately before fetching any more messages. This strategy requires a significant amount of communication with the broker and could correspondingly degrade consumer performance, but it will keep the offset commits tightly synchronized with the consumer state.

Choose the synchronous offset commit strategy if you want to favor consistency of offset commits over performance, or if you have a low rate of message arrival. The definition of a "low rate" depends on the situation, but tens of messages per second could be considered a "low rate" in most situations.

asynchronous-offset-commits

Asynchronous offset commits

When handle_message_set/2 returns {:async_commit, new_state}, KafkaEx will not commit offsets after every message set consumed. To avoid excessive network calls, the offsets are committed periodically (and when the worker terminates).

How often a KafkaEx.GenConsumer auto-commits offsets is controlled by the two configuration values :commit_interval and :commit_threshold.

  • :commit_interval is the maximum time (in milliseconds) that a KafkaEx.GenConsumer will delay committing the offset for an acknowledged message.

  • :commit_threshold is the maximum number of acknowledged messages that a KafkaEx.GenConsumer will allow to be uncommitted before triggering a commit.

These can be set globally in the :kafka_ex app's environment or on a per-consumer basis by passing options to start_link/5:

# In config/config.exs
config :kafka_ex,
  commit_interval: 5000,
  commit_threshold: 100

# As options to start_link/5
KafkaEx.GenConsumer.start_link(MyConsumer, "my_group", "topic", 0,
                               commit_interval: 5000,
                               commit_threshold: 100)

For low-volume topics, :commit_interval is the dominant factor for how often a KafkaEx.GenConsumer auto-commits. For high-volume topics, :commit_threshold is the dominant factor.

handler-state-and-interaction

Handler state and interaction

Use the init/2 to initialize consumer state and handle_call/3, handle_cast/2, or handle_info/2 to interact.

Example:

defmodule MyConsumer do
  use KafkaEx.GenConsumer

  defmodule State do
    defstruct messages: [], calls: 0
  end

  def init(_topic, _partition) do
    {:ok, %State{}}
  end

  def init(_topic, _partition, extra_args) do
    {:ok, %State{}}
  end

  def handle_message_set(message_set, state) do
    {:async_commit, %{state | messages: state.messages ++ message_set}}
  end

  def handle_call(:messages, _from, state) do
    {:reply, state.messages, %{state | calls: state.calls + 1}}
  end
end

{:ok, pid} = GenConsumer.start_link(MyConsumer, "consumer_group", "topic", 0)
GenConsumer.call(pid, :messages)

NOTE If you do not implement handle_call/3 or handle_cast/2, any calls to GenConsumer.call/3 or casts to GenConsumer.cast/2 will raise an error. Similarly, any messages sent to a GenConsumer will log an error if there is no corresponding handle_info/2 callback defined.

testing

Testing

A KafkaEx.GenConsumer can be unit-tested without a running Kafka broker by sending messages directly to its handle_message_set/2 function. The following recipe can be used as a starting point when testing a KafkaEx.GenConsumer:

defmodule ExampleGenConsumerTest do
  use ExUnit.Case, async: true

  alias KafkaEx.Protocol.Fetch.Message

  @topic "topic"
  @partition 0

  setup do
    {:ok, state} = ExampleGenConsumer.init(@topic, @partition)
    {:ok, %{state: state}}
  end

  test "it acks a message", %{state: state} do
    message_set = [%Message{offset: 0, value: "hello"}]
    {response, _new_state} =
      ExampleGenConsumer.handle_message_set(message_set, state)
    assert response == :async_commit
  end
end

Link to this section Summary

Types

Option values used when starting a KafkaEx.GenConsumer.

Options used when starting a KafkaEx.GenConsumer.

Callbacks

Invoked by sending messages to the consumer.

Invoked for each message set consumed from a Kafka topic partition.

Invoked when the server is started. start_link/5 will block until it returns.

Invoked when the server is started. start_link/5 will block until it returns.

Functions

Forwards a GenServer.call/3 to the consumer implementation with the consumer's state.

Forwards a GenServer.cast/2 to the consumer implementation with the consumer's state.

Returns a specification to start this module under a supervisor.

Callback implementation for GenServer.init/1.

Returns the topic and partition id for this consumer process

Link to this section Types

@type option() ::
  {:commit_interval, non_neg_integer()}
  | {:commit_threshold, non_neg_integer()}
  | {:auto_offset_reset, :none | :earliest | :latest}
  | {:api_versions, map()}
  | {:extra_consumer_args, map()}

Option values used when starting a KafkaEx.GenConsumer.

@type options() :: [option() | GenServer.option()]

Options used when starting a KafkaEx.GenConsumer.

Link to this section Callbacks

Link to this callback

handle_call(call, from, state)

View Source
@callback handle_call(call :: term(), from :: GenServer.from(), state :: term()) ::
  {:reply, reply_value :: term(), new_state :: term()}
  | {:stop, reason :: term(), reply_value :: term(), new_state :: term()}
  | {:stop, reason :: term(), new_state :: term()}

Invoked by KafkaEx.GenConsumer.call/3.

Note the default implementation will cause a RuntimeError. If you want to interact with your consumer, you must implement a handle_call function.

Link to this callback

handle_cast(cast, state)

View Source
@callback handle_cast(cast :: term(), state :: term()) ::
  {:noreply, new_state :: term()}
  | {:stop, reason :: term(), new_state :: term()}

Invoked by KafkaEx.GenConsumer.cast/2.

Note the default implementation will cause a RuntimeError. If you want to interact with your consumer, you must implement a handle_cast function.

Link to this callback

handle_info(info, state)

View Source
@callback handle_info(info :: term(), state :: term()) ::
  {:noreply, new_state :: term()}
  | {:stop, reason :: term(), new_state :: term()}

Invoked by sending messages to the consumer.

Note the default implementation will log error messages. If you want to interact with your consumer, you must implement a handle_info function.

Link to this callback

handle_message_set(message_set, state)

View Source
@callback handle_message_set(
  message_set :: [KafkaEx.Protocol.Fetch.Message.t()],
  state :: term()
) ::
  {:async_commit, new_state :: term()} | {:sync_commit, new_state :: term()}

Invoked for each message set consumed from a Kafka topic partition.

message_set is a message set fetched from a Kafka broker and state is the current state of the KafkaEx.GenConsumer.

Returning {:async_commit, new_state} acknowledges message and continues to consume from the Kafka queue with new state new_state. Acknowledged messages will be auto-committed (possibly at a later time) based on the :commit_interval and :commit_threshold options.

Returning {:sync_commit, new_state} commits message synchronously before continuing to consume from the Kafka queue with new state new_state. Committing a message synchronously means that no more messages will be consumed until the message's offset is committed. :sync_commit should be used sparingly, since committing every message synchronously would impact a consumer's performance and could result in excessive network traffic.

@callback init(topic :: binary(), partition :: non_neg_integer()) ::
  {:ok, state :: term()} | {:stop, reason :: term()}

Invoked when the server is started. start_link/5 will block until it returns.

topic and partition are the arguments passed to start_link/5. They identify the Kafka partition that the KafkaEx.GenConsumer will consume from.

Returning {:ok, state} will cause start_link/5 to return {:ok, pid} and the process to start consuming from its assigned partition. state becomes the consumer's state.

Any other return value will cause the start_link/5 to return {:error, error} and the process to exit.

Link to this callback

init( topic, partition, extra_args )

View Source
@callback init(
  topic :: binary(),
  partition :: non_neg_integer(),
  extra_args :: map()
) :: {:ok, state :: term()} | {:stop, reason :: term()}

Invoked when the server is started. start_link/5 will block until it returns.

topic and partition are the arguments passed to start_link/5. They identify the Kafka partition that the KafkaEx.GenConsumer will consume from.

extra_args is the value of the extra_consumer_args option to start_link/5.

The default implementation of this function calls init/2.

Returning {:ok, state} will cause start_link/5 to return {:ok, pid} and the process to start consuming from its assigned partition. state becomes the consumer's state.

Any other return value will cause the start_link/5 to return {:error, error} and the process to exit.

Link to this section Functions

Link to this function

call(gen_consumer, message, timeout \\ 5000)

View Source
@spec call(GenServer.server(), term(), timeout()) :: term()

Forwards a GenServer.call/3 to the consumer implementation with the consumer's state.

The implementation must return a GenServer.call/3-compatible value of the form {:reply, reply_value, new_consumer_state}. The GenConsumer will turn this into an immediate timeout, which drives continued message consumption.

See the moduledoc for an example.

Link to this function

cast(gen_consumer, message)

View Source
@spec cast(GenServer.server(), term()) :: term()

Forwards a GenServer.cast/2 to the consumer implementation with the consumer's state.

The implementation must return a GenServer.cast/2-compatible value of the form {:noreply, new_consumer_state}. The GenConsumer will turn this into an immediate timeout, which drives continued message consumption.

Returns a specification to start this module under a supervisor.

See Supervisor.

Callback implementation for GenServer.init/1.

Link to this function

partition(gen_consumer, timeout \\ 5000)

View Source

Returns the topic and partition id for this consumer process

Link to this function

start_link(consumer_module, group_name, topic, partition, opts \\ [])

View Source
@spec start_link(
  consumer_module :: module(),
  consumer_group_name :: binary(),
  topic_name :: binary(),
  partition_id :: non_neg_integer(),
  options()
) :: GenServer.on_start()

Starts a KafkaEx.GenConsumer process linked to the current process.

This can be used to start the KafkaEx.GenConsumer as part of a supervision tree.

Once the consumer has been started, the init/2 function of consumer_module is called with topic and partition as its arguments. group_name is the consumer group name that will be used for managing consumer offsets.

options

Options

  • :commit_interval - The interval in milliseconds that the consumer will wait to commit offsets of handled messages. Default 5_000.

  • :commit_threshold - Threshold number of messages consumed to commit offsets to the broker. Default 100.

  • :auto_offset_reset - The policy for resetting offsets when an :offset_out_of_range error occurs. :earliest will move the offset to the oldest available, :latest moves to the most recent. If anything else is specified, the error will simply be raised.

  • :fetch_options - Optional keyword list that is passed along to the KafkaEx.fetch call.

  • :extra_consumer_args - Optional parameter that is passed along to the GenConsumer.init call in the consumer module. Note that if init/3 is not implemented, the default implementation calls to init/2, dropping the extra arguments.

NOTE :commit_interval, auto_commit_reset and :commit_threshold default to the application config (e.g., Application.get_env/2) if that value is present, or the stated default if the application config is not present.

Any valid options for GenServer.start_link/3 can also be specified.

return-values

Return Values

This function has the same return values as GenServer.start_link/3.