View Source brod_group_subscriber behaviour (brod v4.4.0)
A group subscriber is a gen_server which subscribes to partition consumers (poller) and calls the user-defined callback functions for message processing.
An overview of what it does behind the scene:
- Start a consumer group coordinator to manage the consumer group states, see
brod_group_coordinator:start_link/6
- Start (if not already started) topic-consumers (pollers) and subscribe to the partition workers when group assignment is received from the group leader, see
brod:start_consumer/3
- Call
CallbackModule:handle_message/4
when messages are received from the partition consumers. - Send acknowledged offsets to group coordinator which will be committed to kafka periodically.
Callbacks are documented in the source code of this module.
Summary
Functions
Acknowledge and commit an offset. The subscriber may ack a later (greater) offset which will be considered as multi-acking the earlier (smaller) offsets. This also means that disordered acks may overwrite offset commits and lead to unnecessary message re-delivery in case of restart.
Acknowledge an offset. This call may or may not commit group subscriber offset depending on the value of Commit
argument
This function is called only when partition_assignment_strategy
is set for callback_implemented
in group config.
Called by group coordinator when there is new assignment received.
Called by group coordinator before re-joining the consumer group.
Commit all acked offsets. NOTE: This is an async call.
Commit offset for a topic. This is an asynchronous call
Called by group coordinator when initializing the assignments for subscriber.
Start (link) a group subscriber.
Stop group subscriber, wait for pid DOWN
before return.
Types
-type ack_ref() :: {brod:topic(), brod:partition(), brod:offset()}.
-type cb_state() :: term().
-type consumer() :: #consumer{topic_partition :: {brod:topic(), brod:partition()}, consumer_pid :: undefined | pid() | {down, string(), any()}, consumer_mref :: undefined | reference(), begin_offset :: undefined | brod:offset(), acked_offset :: undefined | brod:offset(), last_offset :: undefined | brod:offset()}.
-type member_id() :: brod:group_member_id().
-type state() :: #state{client :: brod:client(), client_mref :: reference(), groupId :: brod:group_id(), memberId :: undefined | member_id(), generationId :: undefined | brod:group_generation_id(), coordinator :: pid(), consumers :: [consumer()], consumer_config :: brod:consumer_config(), is_blocked :: boolean(), subscribe_tref :: undefined | reference(), cb_module :: module(), cb_state :: cb_state(), message_type :: message | message_set}.
Callbacks
-callback handle_message(brod:topic(), brod:partition(), brod:message() | brod:message_set(), cb_state()) -> {ok, cb_state()} | {ok, ack, cb_state()} | {ok, ack_no_commit, cb_state()}.
-callback init(brod:group_id(), term()) -> {ok, cb_state()}.
Functions
-spec ack(pid(), brod:topic(), brod:partition(), brod:offset()) -> ok.
Acknowledge and commit an offset. The subscriber may ack a later (greater) offset which will be considered as multi-acking the earlier (smaller) offsets. This also means that disordered acks may overwrite offset commits and lead to unnecessary message re-delivery in case of restart.
-spec ack(pid(), brod:topic(), brod:partition(), brod:offset(), boolean()) -> ok.
Acknowledge an offset. This call may or may not commit group subscriber offset depending on the value of Commit
argument
-spec assign_partitions(pid(), [brod:group_member()], [{brod:topic(), brod:partition()}]) -> [{member_id(), [brod:partition_assignment()]}].
This function is called only when partition_assignment_strategy
is set for callback_implemented
in group config.
-spec assignments_received(pid(), member_id(), integer(), brod:received_assignments()) -> ok.
Called by group coordinator when there is new assignment received.
-spec assignments_revoked(pid()) -> ok.
Called by group coordinator before re-joining the consumer group.
-spec commit(pid()) -> ok.
Commit all acked offsets. NOTE: This is an async call.
-spec commit(pid(), brod:topic(), brod:partition(), brod:offset()) -> ok.
Commit offset for a topic. This is an asynchronous call
-spec get_committed_offsets(pid(), [{brod:topic(), brod:partition()}]) -> {ok, [{{brod:topic(), brod:partition()}, brod:offset()}]}.
Called by group coordinator when initializing the assignments for subscriber.
NOTE: This function is called only when offset_commit_policy
is set to consumer_managed
in group config.
NOTE: The committed offsets should be the offsets for successfully processed (acknowledged) messages, not the begin_offset
to start fetching from.
-spec start_link(brod:client(), brod:group_id(), [brod:topic()], brod:group_config(), brod:consumer_config(), module(), term()) -> {ok, pid()} | {error, any()}.
Equivalent to start_link(Client, GroupId, Topics, GroupConfig, ConsumerConfig, message, CbModule, CbInitArg).
Handle a message. Return one of:
{ok, NewCallbackState}
: The subscriber has received the message for processing async-ly. It should call brod_group_subscriber:ack/4
to acknowledge later.
{ok, ack, NewCallbackState}
: The subscriber has completed processing the message.
{ok, ack_no_commit, NewCallbackState}
: The subscriber has completed processing the message, but it is not ready to commit offset yet. It should call brod_group_subscriber:commit/4
later.
While this callback function is being evaluated, the fetch-ahead partition-consumers are fetching more messages behind the scene unless prefetch_count and prefetch_bytes are set to 0 in consumer config.
-spec start_link(brod:client(), brod:group_id(), [brod:topic()], brod:group_config(), brod:consumer_config(), message | message_set, module(), term()) -> {ok, pid()} | {error, any()}.
Start (link) a group subscriber.
Client
: Client ID (or pid, but not recommended) of the brod client.
GroupId
: Consumer group ID which should be unique per kafka cluster
Topics
: Predefined set of topic names to join the group.
NOTE: The group leader member will collect topics from all members and assign all collected topic-partitions to members in the group. i.e. members can join with arbitrary set of topics.
GroupConfig
: For group coordinator, see brod_group_coordinator:start_link/6
ConsumerConfig
: For partition consumer, see brod_consumer:start_link/4
MessageType
: The type of message that is going to be handled by the callback module. Can be either message
or message_set
.
CbModule
: Callback module which should have the callback functions implemented for message processing.
CbInitArg
: The term() that is going to be passed to CbModule:init/2
as a second argument when initializing the subscriber.
-spec stop(pid()) -> ok.
Stop group subscriber, wait for pid DOWN
before return.