View Source brod_group_subscriber_v2 behaviour (brod v4.3.3)

This module implements an improved version of brod_group_subscriber behavior. Key difference is that each partition worker runs in a separate Erlang process, allowing parallel message processing.

Callbacks are documented in the source code of this module.

Summary

Functions

Commit offset for a topic-partition, but don't commit it to Kafka. This is an asynchronous call
This function is called only when partition_assignment_strategy is set for callback_implemented in group config.
Called by group coordinator when there is new assignment received.
Called by group coordinator before re-joining the consumer group.
Ack offset for a topic-partition. This is an asynchronous call

Called by group coordinator when initializing the assignments for subscriber.

Returns a map from Topic-Partitions to worker PIDs for the given group. Useful for health checking. This is a synchronous call.

Start (link) a group subscriber.

Stop group subscriber, wait for pid DOWN before return.

Types

-type ack_fun() :: fun((brod:offset()) -> ok).
-type commit_fun() :: fun((brod:offset()) -> ok).
-type committed_offsets() :: #{brod:topic_partition() => {brod:offset(), boolean()}}.
-type init_info() ::
          #{group_id := brod:group_id(),
            topic := brod:topic(),
            partition := brod:partition(),
            commit_fun := commit_fun(),
            ack_fun := ack_fun()}.
-type member_id() :: brod:group_member_id().
-type state() ::
          #state{config :: subscriber_config(),
                 message_type :: message | message_set,
                 group_id :: brod:group_id(),
                 coordinator :: undefined | pid(),
                 generation_id :: integer() | undefined,
                 workers :: workers(),
                 committed_offsets :: committed_offsets(),
                 cb_module :: module(),
                 cb_config :: term(),
                 client :: brod:client()}.
-type subscriber_config() ::
          #{client := brod:client(),
            group_id := brod:group_id(),
            topics := [brod:topic()],
            cb_module := module(),
            init_data => term(),
            message_type => message | message_set,
            consumer_config => brod:consumer_config(),
            group_config => brod:group_config()}.
-type worker() :: pid().
-type workers() :: #{brod:topic_partition() => worker()}.

Callbacks

Link to this callback

assign_partitions/3

View Source (optional)
-callback assign_partitions(_CbConfig, [brod:group_member()], [brod:topic_partition()]) ->
                               [{member_id(), [brod:partition_assignment()]}].
Link to this callback

get_committed_offset/3

View Source (optional)
-callback get_committed_offset(_CbConfig, brod:topic(), brod:partition()) ->
                                  {ok, brod:offset() | {begin_offset, brod:offset_time()}} | undefined.
-callback handle_message(brod:message() | brod:message_set(), State) ->
                            {ok, commit, State} | {ok, ack, State} | {ok, State}.
-callback init(brod_group_subscriber_v2:init_info(), _CbConfig) -> {ok, _State}.
-callback terminate(_Reason, _State) -> _.

Functions

Link to this function

ack(Pid, Topic, Partition, Offset)

View Source
-spec ack(pid(), brod:topic(), brod:partition(), brod:offset()) -> ok.
Commit offset for a topic-partition, but don't commit it to Kafka. This is an asynchronous call
Link to this function

assign_partitions(Pid, Members, TopicPartitionList)

View Source
-spec assign_partitions(pid(), [brod:group_member()], [brod:topic_partition()]) ->
                           [{member_id(), [brod:partition_assignment()]}].
This function is called only when partition_assignment_strategy is set for callback_implemented in group config.
Link to this function

assignments_received(Pid, MemberId, GenerationId, TopicAssignments)

View Source
-spec assignments_received(pid(), member_id(), integer(), brod:received_assignments()) -> ok.
Called by group coordinator when there is new assignment received.
Link to this function

assignments_revoked(Pid)

View Source
-spec assignments_revoked(pid()) -> ok.
Called by group coordinator before re-joining the consumer group.
Link to this function

commit(Pid, Topic, Partition, Offset)

View Source
-spec commit(pid(), brod:topic(), brod:partition(), brod:offset()) -> ok.
Ack offset for a topic-partition. This is an asynchronous call
Link to this function

get_committed_offsets(Pid, TopicPartitions)

View Source
-spec get_committed_offsets(pid(), [brod:topic_partition()]) ->
                               {ok, [{brod:topic_partition(), brod:offset()}]}.

Called by group coordinator when initializing the assignments for subscriber.

NOTE: This function is called only when offset_commit_policy is set to consumer_managed in group config.
-spec get_workers(pid()) -> workers().
Returns a map from Topic-Partitions to worker PIDs for the given group. Useful for health checking. This is a synchronous call.
Link to this function

handle_call(Call, From, State)

View Source
-spec start_link(subscriber_config()) -> {ok, pid()} | {error, any()}.

Start (link) a group subscriber.

Possible Config keys:

  • client: Client ID (or pid, but not recommended) of the brod client. Mandatory
  • group_id: Consumer group ID which should be unique per kafka cluster. Mandatory
  • topics: Predefined set of topic names to join the group. Mandatory

    NOTE: The group leader member will collect topics from all members and assign all collected topic-partitions to members in the group. i.e. members can join with arbitrary set of topics.
  • cb_module: Callback module which should have the callback functions implemented for message processing. Mandatory
  • group_config: For group coordinator, see brod_group_coordinator:start_link/6 Optional
  • consumer_config: For partition consumer, brod_consumer:start_link/5. Optional
  • message_type: The type of message that is going to be handled by the callback module. Can be either message or message set. Optional, defaults to message
  • init_data: The term() that is going to be passed to CbModule:init/2 when initializing the subscriber. Optional, defaults to undefined
-spec stop(pid()) -> ok.
Stop group subscriber, wait for pid DOWN before return.