View Source KafkaEx.GenConsumer behaviour (kafka_ex v0.13.0)
A behaviour module for implementing a Kafka consumer.
A KafkaEx.GenConsumer
is an Elixir process that consumes messages from
Kafka. A single KafkaEx.GenConsumer
process consumes from a single
partition of a Kafka topic. Several KafkaEx.GenConsumer
processes can be
used to consume from multiple partitions or even multiple topics. Partition
assignments for a group of KafkaEx.GenConsumer
s can be defined manually
using KafkaEx.GenConsumer.Supervisor
or coordinated across a cluster of
nodes using KafkaEx.ConsumerGroup
.
A KafkaEx.GenConsumer
must implement three callbacks. Two of these will be
defined with default behavior if you add use KafkaEx.GenConsumer
to your
module, leaving just handle_message_set/2
to be implemented. This is the
recommended usage.
example
Example
The KafkaEx.GenConsumer
behaviour abstracts common Kafka consumer
interactions. KafkaEx.GenConsumer
will take care of the details of
determining a starting offset, fetching messages from a Kafka broker, and
committing offsets for consumed messages. Developers are only required to
implement handle_message_set/2
to process messages.
The following is a minimal example that logs each message as it's consumed:
defmodule ExampleGenConsumer do
use KafkaEx.GenConsumer
alias KafkaEx.Protocol.Fetch.Message
require Logger
# note - messages are delivered in batches
def handle_message_set(message_set, state) do
for %Message{value: message} <- message_set do
Logger.debug(fn -> "message: " <> inspect(message) end)
end
{:async_commit, state}
end
end
handle_message_set/2
will be called with the batch of messages fetched
from the broker. The number of messages in a batch is determined by the
number of messages available and the max_bytes
and min_bytes
parameters
of the fetch request (which can be configured in KafkaEx). In this example,
because handle_message_set/2
always returns {:async_commit, new_state}
,
the message offsets will be automatically committed asynchronously.
committing-offsets
Committing Offsets
KafkaEx.GenConsumer
manages a consumer's offsets by committing the offsets
of consumed messages. KafkaEx supports two commit strategies: asynchronous
and synchronous. The return value of handle_message_set/2
determines
which strategy is used:
{:sync_commit, new_state}
causes synchronous offset commits.{:async_commit, new_state}
causes asynchronous offset commits.
Note that with both of the offset commit strategies, only if the final offset
in the message set is committed and this is done after the messages are
consumed. If you want to commit the offset of every message consumed, use
the synchronous offset commit strategy and implement calls to
KafkaEx.offset_commit/2
within your consumer as appropriate.
synchronous-offset-commits
Synchronous offset commits
When handle_message_set/2
returns {:sync_commit, new_state}
, the offset
of the final message in the message set is committed immediately before
fetching any more messages. This strategy requires a significant amount of
communication with the broker and could correspondingly degrade consumer
performance, but it will keep the offset commits tightly synchronized with
the consumer state.
Choose the synchronous offset commit strategy if you want to favor consistency of offset commits over performance, or if you have a low rate of message arrival. The definition of a "low rate" depends on the situation, but tens of messages per second could be considered a "low rate" in most situations.
asynchronous-offset-commits
Asynchronous offset commits
When handle_message_set/2
returns {:async_commit, new_state}
, KafkaEx
will not commit offsets after every message set consumed. To avoid
excessive network calls, the offsets are committed periodically (and when
the worker terminates).
How often a KafkaEx.GenConsumer
auto-commits offsets is controlled by the two
configuration values :commit_interval
and :commit_threshold
.
:commit_interval
is the maximum time (in milliseconds) that aKafkaEx.GenConsumer
will delay committing the offset for an acknowledged message.:commit_threshold
is the maximum number of acknowledged messages that aKafkaEx.GenConsumer
will allow to be uncommitted before triggering a commit.
These can be set globally in the :kafka_ex
app's environment or on a
per-consumer basis by passing options to start_link/5
:
# In config/config.exs
config :kafka_ex,
commit_interval: 5000,
commit_threshold: 100
# As options to start_link/5
KafkaEx.GenConsumer.start_link(MyConsumer, "my_group", "topic", 0,
commit_interval: 5000,
commit_threshold: 100)
For low-volume topics, :commit_interval
is the dominant factor for how
often a KafkaEx.GenConsumer
auto-commits. For high-volume topics,
:commit_threshold
is the dominant factor.
handler-state-and-interaction
Handler state and interaction
Use the init/2
to initialize consumer state and handle_call/3
,
handle_cast/2
, or handle_info/2
to interact.
Example:
defmodule MyConsumer do
use KafkaEx.GenConsumer
defmodule State do
defstruct messages: [], calls: 0
end
def init(_topic, _partition) do
{:ok, %State{}}
end
def init(_topic, _partition, extra_args) do
{:ok, %State{}}
end
def handle_message_set(message_set, state) do
{:async_commit, %{state | messages: state.messages ++ message_set}}
end
def handle_call(:messages, _from, state) do
{:reply, state.messages, %{state | calls: state.calls + 1}}
end
end
{:ok, pid} = GenConsumer.start_link(MyConsumer, "consumer_group", "topic", 0)
GenConsumer.call(pid, :messages)
NOTE If you do not implement handle_call/3
or handle_cast/2
, any
calls to GenConsumer.call/3
or casts to GenConsumer.cast/2
will raise an
error. Similarly, any messages sent to a GenConsumer
will log an error if
there is no corresponding handle_info/2
callback defined.
testing
Testing
A KafkaEx.GenConsumer
can be unit-tested without a running Kafka broker by sending
messages directly to its handle_message_set/2
function. The following
recipe can be used as a starting point when testing a KafkaEx.GenConsumer
:
defmodule ExampleGenConsumerTest do
use ExUnit.Case, async: true
alias KafkaEx.Protocol.Fetch.Message
@topic "topic"
@partition 0
setup do
{:ok, state} = ExampleGenConsumer.init(@topic, @partition)
{:ok, %{state: state}}
end
test "it acks a message", %{state: state} do
message_set = [%Message{offset: 0, value: "hello"}]
{response, _new_state} =
ExampleGenConsumer.handle_message_set(message_set, state)
assert response == :async_commit
end
end
Link to this section Summary
Types
Option values used when starting a KafkaEx.GenConsumer
.
Options used when starting a KafkaEx.GenConsumer
.
Callbacks
Invoked by KafkaEx.GenConsumer.cast/2
.
Invoked by sending messages to the consumer.
Invoked for each message set consumed from a Kafka topic partition.
Invoked when the server is started. start_link/5
will block until it
returns.
Invoked when the server is started. start_link/5
will block until it
returns.
Functions
Forwards a GenServer.call/3
to the consumer implementation with the
consumer's state.
Forwards a GenServer.cast/2
to the consumer implementation with the
consumer's state.
Returns a specification to start this module under a supervisor.
Callback implementation for GenServer.init/1
.
Returns the topic and partition id for this consumer process
Starts a KafkaEx.GenConsumer
process linked to the current process.
Link to this section Types
@type option() :: {:commit_interval, non_neg_integer()} | {:commit_threshold, non_neg_integer()} | {:auto_offset_reset, :none | :earliest | :latest} | {:api_versions, map()} | {:extra_consumer_args, map()}
Option values used when starting a KafkaEx.GenConsumer
.
@type options() :: [option() | GenServer.option()]
Options used when starting a KafkaEx.GenConsumer
.
Link to this section Callbacks
@callback handle_call(call :: term(), from :: GenServer.from(), state :: term()) :: {:reply, reply_value :: term(), new_state :: term()} | {:stop, reason :: term(), reply_value :: term(), new_state :: term()} | {:stop, reason :: term(), new_state :: term()}
Invoked by KafkaEx.GenConsumer.call/3
.
Note the default implementation will cause a RuntimeError
. If you want to
interact with your consumer, you must implement a handle_call function.
@callback handle_cast(cast :: term(), state :: term()) :: {:noreply, new_state :: term()} | {:stop, reason :: term(), new_state :: term()}
Invoked by KafkaEx.GenConsumer.cast/2
.
Note the default implementation will cause a RuntimeError
. If you want to
interact with your consumer, you must implement a handle_cast function.
@callback handle_info(info :: term(), state :: term()) :: {:noreply, new_state :: term()} | {:stop, reason :: term(), new_state :: term()}
Invoked by sending messages to the consumer.
Note the default implementation will log error messages. If you want to interact with your consumer, you must implement a handle_info function.
@callback handle_message_set( message_set :: [KafkaEx.Protocol.Fetch.Message.t()], state :: term() ) :: {:async_commit, new_state :: term()} | {:sync_commit, new_state :: term()}
Invoked for each message set consumed from a Kafka topic partition.
message_set
is a message set fetched from a Kafka broker and state
is the
current state of the KafkaEx.GenConsumer
.
Returning {:async_commit, new_state}
acknowledges message
and continues
to consume from the Kafka queue with new state new_state
. Acknowledged
messages will be auto-committed (possibly at a later time) based on the
:commit_interval
and :commit_threshold
options.
Returning {:sync_commit, new_state}
commits message
synchronously before
continuing to consume from the Kafka queue with new state new_state
.
Committing a message synchronously means that no more messages will be
consumed until the message's offset is committed. :sync_commit
should be
used sparingly, since committing every message synchronously would impact a
consumer's performance and could result in excessive network traffic.
@callback init(topic :: binary(), partition :: non_neg_integer()) :: {:ok, state :: term()} | {:stop, reason :: term()}
Invoked when the server is started. start_link/5
will block until it
returns.
topic
and partition
are the arguments passed to start_link/5
. They
identify the Kafka partition that the KafkaEx.GenConsumer
will consume from.
Returning {:ok, state}
will cause start_link/5
to return {:ok, pid}
and
the process to start consuming from its assigned partition. state
becomes
the consumer's state.
Any other return value will cause the start_link/5
to return {:error, error}
and the process to exit.
@callback init( topic :: binary(), partition :: non_neg_integer(), extra_args :: map() ) :: {:ok, state :: term()} | {:stop, reason :: term()}
Invoked when the server is started. start_link/5
will block until it
returns.
topic
and partition
are the arguments passed to start_link/5
. They
identify the Kafka partition that the KafkaEx.GenConsumer
will consume from.
extra_args
is the value of the extra_consumer_args
option to start_link/5
.
The default implementation of this function calls init/2
.
Returning {:ok, state}
will cause start_link/5
to return {:ok, pid}
and
the process to start consuming from its assigned partition. state
becomes
the consumer's state.
Any other return value will cause the start_link/5
to return {:error, error}
and the process to exit.
Link to this section Functions
@spec call(GenServer.server(), term(), timeout()) :: term()
Forwards a GenServer.call/3
to the consumer implementation with the
consumer's state.
The implementation must return a GenServer.call/3
-compatible value of the
form {:reply, reply_value, new_consumer_state}
. The GenConsumer will
turn this into an immediate timeout, which drives continued message
consumption.
See the moduledoc for an example.
@spec cast(GenServer.server(), term()) :: term()
Forwards a GenServer.cast/2
to the consumer implementation with the
consumer's state.
The implementation must return a GenServer.cast/2
-compatible value of the
form {:noreply, new_consumer_state}
. The GenConsumer will turn this into an
immediate timeout, which drives continued message consumption.
Returns a specification to start this module under a supervisor.
See Supervisor
.
Callback implementation for GenServer.init/1
.
Returns the topic and partition id for this consumer process
start_link(consumer_module, group_name, topic, partition, opts \\ [])
View Source@spec start_link( consumer_module :: module(), consumer_group_name :: binary(), topic_name :: binary(), partition_id :: non_neg_integer(), options() ) :: GenServer.on_start()
Starts a KafkaEx.GenConsumer
process linked to the current process.
This can be used to start the KafkaEx.GenConsumer
as part of a supervision tree.
Once the consumer has been started, the init/2
function of
consumer_module
is called with topic
and partition
as its arguments.
group_name
is the consumer group name that will be used for managing
consumer offsets.
options
Options
:commit_interval
- The interval in milliseconds that the consumer will wait to commit offsets of handled messages. Default 5_000.:commit_threshold
- Threshold number of messages consumed to commit offsets to the broker. Default 100.:auto_offset_reset
- The policy for resetting offsets when an:offset_out_of_range
error occurs.:earliest
will move the offset to the oldest available,:latest
moves to the most recent. If anything else is specified, the error will simply be raised.:fetch_options
- Optional keyword list that is passed along to theKafkaEx.fetch
call.:extra_consumer_args
- Optional parameter that is passed along to theGenConsumer.init
call in the consumer module. Note that ifinit/3
is not implemented, the default implementation calls toinit/2
, dropping the extra arguments.
NOTE :commit_interval
, auto_commit_reset
and :commit_threshold
default to the
application config (e.g., Application.get_env/2
) if that value is present, or the stated
default if the application config is not present.
Any valid options for GenServer.start_link/3
can also be specified.
return-values
Return Values
This function has the same return values as GenServer.start_link/3
.