View Source brod_cg_commits (brod v4.0.0)

This is a utility module to help force commit offsets to kafka.

Summary

Functions

This function is called only when partition_assignment_strategy is set for callback_implemented in group config.
Called by group coordinator when there is new assignment received.
Called by group coordinator before re-joinning the consumer group.
Called by group coordinator when initializing the assignments for subscriber. NOTE: this function is called only when it is DISABLED to commit offsets to kafka. i.e. offset_commit_policy is set to consumer_managed
Force commit offsets.
Start (link) a group member. The member will try to join the consumer group and get assignments for the given topic-partitions, then commit given offsets to kafka. In case not all given partitions are assigned to it, it will terminate with an exit exception
Stop the process.
Make a call to the resetter process, the call will be blocked until offsets are committed.

Types

-type group_id() :: brod:group_id().
-type group_input() :: [{prop_key(), prop_val()}].
-type member_id() :: brod:group_member_id().
-type offset() :: brod:offset().
-type offsets() :: latest | earliest | [{partition(), offset()}].
-type partition() :: brod:partition().
-type pending_sync() :: undefined | gen_server:from().
-type prop_key() :: id | topic | retention | protocol | offsets.
-type prop_val() ::
          group_id() | topic() | retention() | offsets() | brod_group_coordinator:protocol_name().
-type retention() :: integer().
-1 to use whatever configured in kafka
-type state() ::
          #state{client :: brod:client(),
                 groupId :: brod:group_id(),
                 memberId :: undefined | member_id(),
                 generationId :: undefined | brod:group_generation_id(),
                 coordinator :: pid(),
                 topic :: undefined | topic(),
                 offsets :: undefined | offsets(),
                 is_elected :: boolean(),
                 pending_sync :: pending_sync(),
                 is_done :: boolean()}.
-type topic() :: brod:topic().

Functions

Link to this function

assign_partitions(Pid, Members, TopicPartitionList)

View Source
-spec assign_partitions(pid(), [brod:group_member()], [{brod:topic(), brod:partition()}]) ->
                           [{member_id(), [brod:partition_assignment()]}].
This function is called only when partition_assignment_strategy is set for callback_implemented in group config.
Link to this function

assignments_received(Pid, MemberId, GenerationId, TopicAssignments)

View Source
-spec assignments_received(pid(), member_id(), integer(), brod:received_assignments()) -> ok.
Called by group coordinator when there is new assignment received.
Link to this function

assignments_revoked(Pid)

View Source
-spec assignments_revoked(pid()) -> ok.
Called by group coordinator before re-joinning the consumer group.
Link to this function

code_change(OldVsn, State, Extra)

View Source
Link to this function

get_committed_offsets(Pid, TopicPartitions)

View Source
-spec get_committed_offsets(pid(), [{brod:topic(), brod:partition()}]) ->
                               {ok, [{{brod:topic(), brod:partition()}, brod:offset()}]}.
Called by group coordinator when initializing the assignments for subscriber. NOTE: this function is called only when it is DISABLED to commit offsets to kafka. i.e. offset_commit_policy is set to consumer_managed
Link to this function

handle_call(Call, From, State0)

View Source
Link to this function

handle_cast(Cast, State)

View Source
Link to this function

handle_info(Info, State)

View Source
Link to this function

run(ClientId, GroupInput)

View Source
Force commit offsets.
Link to this function

start_link(Client, GroupInput)

View Source
-spec start_link(brod:client(), group_input()) -> {ok, pid()} | {error, any()}.
Start (link) a group member. The member will try to join the consumer group and get assignments for the given topic-partitions, then commit given offsets to kafka. In case not all given partitions are assigned to it, it will terminate with an exit exception
-spec stop(pid()) -> ok.
Stop the process.
-spec sync(pid()) -> ok.
Make a call to the resetter process, the call will be blocked until offsets are committed.
Link to this function

terminate(Reason, State)

View Source