View Source brod_group_subscriber_v2 behaviour (brod v4.3.3)
This module implements an improved version of brod_group_subscriber
behavior. Key difference is that each partition worker runs in a separate Erlang process, allowing parallel message processing.
Summary
Functions
Commit offset for a topic-partition, but don't commit it to Kafka. This is an asynchronous call
This function is called only when
partition_assignment_strategy
is set for callback_implemented
in group config.Called by group coordinator when there is new assignment received.
Called by group coordinator before re-joining the consumer group.
Ack offset for a topic-partition. This is an asynchronous call
Called by group coordinator when initializing the assignments for subscriber.
Returns a map from Topic-Partitions to worker PIDs for the given group. Useful for health checking. This is a synchronous call.
Start (link) a group subscriber.
Stop group subscriber, wait for pid
DOWN
before return.Types
-type ack_fun() :: fun((brod:offset()) -> ok).
-type commit_fun() :: fun((brod:offset()) -> ok).
-type committed_offsets() :: #{brod:topic_partition() => {brod:offset(), boolean()}}.
-type init_info() :: #{group_id := brod:group_id(), topic := brod:topic(), partition := brod:partition(), commit_fun := commit_fun(), ack_fun := ack_fun()}.
-type member_id() :: brod:group_member_id().
-type state() :: #state{config :: subscriber_config(), message_type :: message | message_set, group_id :: brod:group_id(), coordinator :: undefined | pid(), generation_id :: integer() | undefined, workers :: workers(), committed_offsets :: committed_offsets(), cb_module :: module(), cb_config :: term(), client :: brod:client()}.
-type subscriber_config() :: #{client := brod:client(), group_id := brod:group_id(), topics := [brod:topic()], cb_module := module(), init_data => term(), message_type => message | message_set, consumer_config => brod:consumer_config(), group_config => brod:group_config()}.
-type worker() :: pid().
-type workers() :: #{brod:topic_partition() => worker()}.
Callbacks
-callback assign_partitions(_CbConfig, [brod:group_member()], [brod:topic_partition()]) -> [{member_id(), [brod:partition_assignment()]}].
-callback get_committed_offset(_CbConfig, brod:topic(), brod:partition()) -> {ok, brod:offset() | {begin_offset, brod:offset_time()}} | undefined.
-callback handle_message(brod:message() | brod:message_set(), State) -> {ok, commit, State} | {ok, ack, State} | {ok, State}.
-callback init(brod_group_subscriber_v2:init_info(), _CbConfig) -> {ok, _State}.
-callback terminate(_Reason, _State) -> _.
Functions
-spec ack(pid(), brod:topic(), brod:partition(), brod:offset()) -> ok.
-spec assign_partitions(pid(), [brod:group_member()], [brod:topic_partition()]) -> [{member_id(), [brod:partition_assignment()]}].
partition_assignment_strategy
is set for callback_implemented
in group config.
Link to this function
assignments_received(Pid, MemberId, GenerationId, TopicAssignments)
View Source-spec assignments_received(pid(), member_id(), integer(), brod:received_assignments()) -> ok.
-spec assignments_revoked(pid()) -> ok.
-spec commit(pid(), brod:topic(), brod:partition(), brod:offset()) -> ok.
-spec get_committed_offsets(pid(), [brod:topic_partition()]) -> {ok, [{brod:topic_partition(), brod:offset()}]}.
Called by group coordinator when initializing the assignments for subscriber.
NOTE: This function is called only whenoffset_commit_policy
is set to consumer_managed
in group config.
-spec start_link(subscriber_config()) -> {ok, pid()} | {error, any()}.
Start (link) a group subscriber.
Possible Config
keys:
client
: Client ID (or pid, but not recommended) of the brod client. Mandatorygroup_id
: Consumer group ID which should be unique per kafka cluster. Mandatory
NOTE: The group leader member will collect topics from all members and assign all collected topic-partitions to members in the group. i.e. members can join with arbitrary set of topics.topics
: Predefined set of topic names to join the group. Mandatorycb_module
: Callback module which should have the callback functions implemented for message processing. Mandatorygroup_config
: For group coordinator, seebrod_group_coordinator:start_link/6
Optionalconsumer_config
: For partition consumer,brod_consumer:start_link/5
. Optionalmessage_type
: The type of message that is going to be handled by the callback module. Can be either message or message set. Optional, defaults tomessage
init_data
: Theterm()
that is going to be passed toCbModule:init/2
when initializing the subscriber. Optional, defaults toundefined
-spec stop(pid()) -> ok.
DOWN
before return.