View Source brod_group_subscriber behaviour (brod v4.0.0)

A group subscriber is a gen_server which subscribes to partition consumers (poller) and calls the user-defined callback functions for message processing.

An overview of what it does behind the scene:
  1. Start a consumer group coordinator to manage the consumer group states, see brod_group_coordinator:start_link/6
  2. Start (if not already started) topic-consumers (pollers) and subscribe to the partition workers when group assignment is received from the group leader, see brod:start_consumer/3
  3. Call CallbackModule:handle_message/4 when messages are received from the partition consumers.
  4. Send acknowledged offsets to group coordinator which will be committed to kafka periodically.
Callbacks are documented in the source code of this module.

Summary

Functions

Acknowledge and commit an offset. The subscriber may ack a later (greater) offset which will be considered as multi-acking the earlier (smaller) offsets. This also means that disordered acks may overwrite offset commits and lead to unnecessary message re-delivery in case of restart.
Acknowledge an offset. This call may or may not commit group subscriber offset depending on the value of Commit argument
This function is called only when partition_assignment_strategy is set for callback_implemented in group config.
Called by group coordinator when there is new assignment received.
Called by group coordinator before re-joining the consumer group.
Commit all acked offsets. NOTE: This is an async call.
Commit offset for a topic. This is an asynchronous call

Called by group coordinator when initializing the assignments for subscriber.

Stop group subscriber, wait for pid DOWN before return.

Types

-type ack_ref() :: {brod:topic(), brod:partition(), brod:offset()}.
-type cb_state() :: term().
-type consumer() ::
          #consumer{topic_partition :: {brod:topic(), brod:partition()},
                    consumer_pid :: undefined | pid() | {down, string(), any()},
                    consumer_mref :: undefined | reference(),
                    begin_offset :: undefined | brod:offset(),
                    acked_offset :: undefined | brod:offset(),
                    last_offset :: undefined | brod:offset()}.
-type member_id() :: brod:group_member_id().
-type state() ::
          #state{client :: brod:client(),
                 client_mref :: reference(),
                 groupId :: brod:group_id(),
                 memberId :: undefined | member_id(),
                 generationId :: undefined | brod:group_generation_id(),
                 coordinator :: pid(),
                 consumers :: [consumer()],
                 consumer_config :: brod:consumer_config(),
                 is_blocked :: boolean(),
                 subscribe_tref :: undefined | reference(),
                 cb_module :: module(),
                 cb_state :: cb_state(),
                 message_type :: message | message_set}.

Callbacks

-callback handle_message(brod:topic(), brod:partition(), brod:message() | brod:message_set(), cb_state()) ->
                            {ok, cb_state()} | {ok, ack, cb_state()} | {ok, ack_no_commit, cb_state()}.
-callback init(brod:group_id(), term()) -> {ok, cb_state()}.

Functions

Link to this function

ack(Pid, Topic, Partition, Offset)

View Source
-spec ack(pid(), brod:topic(), brod:partition(), brod:offset()) -> ok.
Acknowledge and commit an offset. The subscriber may ack a later (greater) offset which will be considered as multi-acking the earlier (smaller) offsets. This also means that disordered acks may overwrite offset commits and lead to unnecessary message re-delivery in case of restart.
Link to this function

ack(Pid, Topic, Partition, Offset, Commit)

View Source
-spec ack(pid(), brod:topic(), brod:partition(), brod:offset(), boolean()) -> ok.
Acknowledge an offset. This call may or may not commit group subscriber offset depending on the value of Commit argument
Link to this function

assign_partitions(Pid, Members, TopicPartitionList)

View Source
-spec assign_partitions(pid(), [brod:group_member()], [{brod:topic(), brod:partition()}]) ->
                           [{member_id(), [brod:partition_assignment()]}].
This function is called only when partition_assignment_strategy is set for callback_implemented in group config.
Link to this function

assignments_received(Pid, MemberId, GenerationId, TopicAssignments)

View Source
-spec assignments_received(pid(), member_id(), integer(), brod:received_assignments()) -> ok.
Called by group coordinator when there is new assignment received.
Link to this function

assignments_revoked(Pid)

View Source
-spec assignments_revoked(pid()) -> ok.
Called by group coordinator before re-joining the consumer group.
Link to this function

code_change(OldVsn, State, Extra)

View Source
-spec commit(pid()) -> ok.
Commit all acked offsets. NOTE: This is an async call.
Link to this function

commit(Pid, Topic, Partition, Offset)

View Source
-spec commit(pid(), brod:topic(), brod:partition(), brod:offset()) -> ok.
Commit offset for a topic. This is an asynchronous call
Link to this function

get_committed_offsets(Pid, TopicPartitions)

View Source
-spec get_committed_offsets(pid(), [{brod:topic(), brod:partition()}]) ->
                               {ok, [{{brod:topic(), brod:partition()}, brod:offset()}]}.

Called by group coordinator when initializing the assignments for subscriber.

NOTE: This function is called only when offset_commit_policy is set to consumer_managed in group config.

NOTE: The committed offsets should be the offsets for successfully processed (acknowledged) messages, not the begin_offset to start fetching from.
Link to this function

handle_call(Call, From, State)

View Source
Link to this function

handle_cast(Cast, State)

View Source
Link to this function

handle_info(Info, State0)

View Source
Link to this function

start_link(Client, GroupId, Topics, GroupConfig, ConsumerConfig, CbModule, CbInitArg)

View Source
-spec start_link(brod:client(),
                 brod:group_id(),
                 [brod:topic()],
                 brod:group_config(),
                 brod:consumer_config(),
                 module(),
                 term()) ->
                    {ok, pid()} | {error, any()}.

Equivalent to start_link(Client, GroupId, Topics, GroupConfig, ConsumerConfig, message, CbModule, CbInitArg).

Handle a message. Return one of:

{ok, NewCallbackState}: The subscriber has received the message for processing async-ly. It should call brod_group_subscriber:ack/4 to acknowledge later.

{ok, ack, NewCallbackState}: The subscriber has completed processing the message.

{ok, ack_no_commit, NewCallbackState}: The subscriber has completed processing the message, but it is not ready to commit offset yet. It should call brod_group_subscriber:commit/4 later.

While this callback function is being evaluated, the fetch-ahead partition-consumers are fetching more messages behind the scene unless prefetch_count and prefetch_bytes are set to 0 in consumer config.
Link to this function

start_link(Client, GroupId, Topics, GroupConfig, ConsumerConfig, MessageType, CbModule, CbInitArg)

View Source
-spec start_link(brod:client(),
                 brod:group_id(),
                 [brod:topic()],
                 brod:group_config(),
                 brod:consumer_config(),
                 message | message_set,
                 module(),
                 term()) ->
                    {ok, pid()} | {error, any()}.

Start (link) a group subscriber.

Client: Client ID (or pid, but not recommended) of the brod client.

GroupId: Consumer group ID which should be unique per kafka cluster

Topics: Predefined set of topic names to join the group.

NOTE: The group leader member will collect topics from all members and assign all collected topic-partitions to members in the group. i.e. members can join with arbitrary set of topics.

GroupConfig: For group coordinator, see brod_group_coordinator:start_link/6

ConsumerConfig: For partition consumer, see brod_consumer:start_link/4

MessageType: The type of message that is going to be handled by the callback module. Can be either message or message_set.

CbModule: Callback module which should have the callback functions implemented for message processing.

CbInitArg: The term() that is going to be passed to CbModule:init/2 as a second argument when initializing the subscriber.
-spec stop(pid()) -> ok.
Stop group subscriber, wait for pid DOWN before return.
Link to this function

terminate(Reason, State)

View Source