View Source brod_group_subscriber behaviour (brod v4.3.2)
A group subscriber is a gen_server which subscribes to partition consumers (poller) and calls the user-defined callback functions for message processing.
An overview of what it does behind the scene:- Start a consumer group coordinator to manage the consumer group states, see
brod_group_coordinator:start_link/6
- Start (if not already started) topic-consumers (pollers) and subscribe to the partition workers when group assignment is received from the group leader, see
brod:start_consumer/3
- Call
CallbackModule:handle_message/4
when messages are received from the partition consumers. - Send acknowledged offsets to group coordinator which will be committed to kafka periodically.
Summary
Functions
Commit
argumentpartition_assignment_strategy
is set for callback_implemented
in group config.Called by group coordinator when initializing the assignments for subscriber.
Start (link) a group subscriber.
DOWN
before return.Types
-type ack_ref() :: {brod:topic(), brod:partition(), brod:offset()}.
-type cb_state() :: term().
-type consumer() :: #consumer{topic_partition :: {brod:topic(), brod:partition()}, consumer_pid :: undefined | pid() | {down, string(), any()}, consumer_mref :: undefined | reference(), begin_offset :: undefined | brod:offset(), acked_offset :: undefined | brod:offset(), last_offset :: undefined | brod:offset()}.
-type member_id() :: brod:group_member_id().
-type state() :: #state{client :: brod:client(), client_mref :: reference(), groupId :: brod:group_id(), memberId :: undefined | member_id(), generationId :: undefined | brod:group_generation_id(), coordinator :: pid(), consumers :: [consumer()], consumer_config :: brod:consumer_config(), is_blocked :: boolean(), subscribe_tref :: undefined | reference(), cb_module :: module(), cb_state :: cb_state(), message_type :: message | message_set}.
Callbacks
-callback handle_message(brod:topic(), brod:partition(), brod:message() | brod:message_set(), cb_state()) -> {ok, cb_state()} | {ok, ack, cb_state()} | {ok, ack_no_commit, cb_state()}.
-callback init(brod:group_id(), term()) -> {ok, cb_state()}.
Functions
-spec ack(pid(), brod:topic(), brod:partition(), brod:offset()) -> ok.
-spec ack(pid(), brod:topic(), brod:partition(), brod:offset(), boolean()) -> ok.
Commit
argument
-spec assign_partitions(pid(), [brod:group_member()], [{brod:topic(), brod:partition()}]) -> [{member_id(), [brod:partition_assignment()]}].
partition_assignment_strategy
is set for callback_implemented
in group config.
assignments_received(Pid, MemberId, GenerationId, TopicAssignments)
View Source-spec assignments_received(pid(), member_id(), integer(), brod:received_assignments()) -> ok.
-spec assignments_revoked(pid()) -> ok.
-spec commit(pid()) -> ok.
-spec commit(pid(), brod:topic(), brod:partition(), brod:offset()) -> ok.
-spec get_committed_offsets(pid(), [{brod:topic(), brod:partition()}]) -> {ok, [{{brod:topic(), brod:partition()}, brod:offset()}]}.
Called by group coordinator when initializing the assignments for subscriber.
NOTE: This function is called only when offset_commit_policy
is set to consumer_managed
in group config.
begin_offset
to start fetching from.
start_link(Client, GroupId, Topics, GroupConfig, ConsumerConfig, CbModule, CbInitArg)
View Source-spec start_link(brod:client(), brod:group_id(), [brod:topic()], brod:group_config(), brod:consumer_config(), module(), term()) -> {ok, pid()} | {error, any()}.
Equivalent to start_link(Client, GroupId, Topics, GroupConfig, ConsumerConfig, message, CbModule, CbInitArg).
Handle a message. Return one of:
{ok, NewCallbackState}
: The subscriber has received the message for processing async-ly. It should call brod_group_subscriber:ack/4
to acknowledge later.
{ok, ack, NewCallbackState}
: The subscriber has completed processing the message.
{ok, ack_no_commit, NewCallbackState}
: The subscriber has completed processing the message, but it is not ready to commit offset yet. It should call brod_group_subscriber:commit/4
later.
start_link(Client, GroupId, Topics, GroupConfig, ConsumerConfig, MessageType, CbModule, CbInitArg)
View Source-spec start_link(brod:client(), brod:group_id(), [brod:topic()], brod:group_config(), brod:consumer_config(), message | message_set, module(), term()) -> {ok, pid()} | {error, any()}.
Start (link) a group subscriber.
Client
: Client ID (or pid, but not recommended) of the brod client.
GroupId
: Consumer group ID which should be unique per kafka cluster
Topics
: Predefined set of topic names to join the group.
NOTE: The group leader member will collect topics from all members and assign all collected topic-partitions to members in the group. i.e. members can join with arbitrary set of topics.
GroupConfig
: For group coordinator, see brod_group_coordinator:start_link/6
ConsumerConfig
: For partition consumer, see brod_consumer:start_link/4
MessageType
: The type of message that is going to be handled by the callback module. Can be either message
or message_set
.
CbModule
: Callback module which should have the callback functions implemented for message processing.
CbInitArg
: The term() that is going to be passed to CbModule:init/2
as a second argument when initializing the subscriber.
-spec stop(pid()) -> ok.
DOWN
before return.