View Source brod_cg_commits (brod v4.4.0)

This is a utility module to help force commit offsets to kafka.

Summary

Functions

This function is called only when partition_assignment_strategy is set for callback_implemented in group config.

Called by group coordinator when there is new assignment received.

Called by group coordinator before re-joinning the consumer group.

Called by group coordinator when initializing the assignments for subscriber. NOTE: this function is called only when it is DISABLED to commit offsets to kafka. i.e. offset_commit_policy is set to consumer_managed

Force commit offsets.

Start (link) a group member. The member will try to join the consumer group and get assignments for the given topic-partitions, then commit given offsets to kafka. In case not all given partitions are assigned to it, it will terminate with an exit exception

Stop the process.

Make a call to the resetter process, the call will be blocked until offsets are committed.

Types

group_id/0

-type group_id() :: brod:group_id().

group_input/0

-type group_input() :: [{prop_key(), prop_val()}].

member_id/0

-type member_id() :: brod:group_member_id().

offset/0

-type offset() :: brod:offset().

offsets/0

-type offsets() :: latest | earliest | [{partition(), offset()}].

partition/0

-type partition() :: brod:partition().

pending_sync/0

-type pending_sync() :: undefined | gen_server:from().

prop_key/0

-type prop_key() :: id | topic | retention | protocol | offsets.

prop_val/0

-type prop_val() ::
          group_id() | topic() | retention() | offsets() | brod_group_coordinator:protocol_name().

retention/0

-type retention() :: integer().

-1 to use whatever configured in kafka

state/0

-type state() ::
          #state{client :: brod:client(),
                 groupId :: brod:group_id(),
                 memberId :: undefined | member_id(),
                 generationId :: undefined | brod:group_generation_id(),
                 coordinator :: pid(),
                 topic :: undefined | topic(),
                 offsets :: undefined | offsets(),
                 is_elected :: boolean(),
                 pending_sync :: pending_sync(),
                 is_done :: boolean()}.

topic/0

-type topic() :: brod:topic().

Functions

assign_partitions(Pid, Members, TopicPartitionList)

-spec assign_partitions(pid(), [brod:group_member()], [{brod:topic(), brod:partition()}]) ->
                           [{member_id(), [brod:partition_assignment()]}].

This function is called only when partition_assignment_strategy is set for callback_implemented in group config.

assignments_received(Pid, MemberId, GenerationId, TopicAssignments)

-spec assignments_received(pid(), member_id(), integer(), brod:received_assignments()) -> ok.

Called by group coordinator when there is new assignment received.

assignments_revoked(Pid)

-spec assignments_revoked(pid()) -> ok.

Called by group coordinator before re-joinning the consumer group.

code_change(OldVsn, State, Extra)

get_committed_offsets(Pid, TopicPartitions)

-spec get_committed_offsets(pid(), [{brod:topic(), brod:partition()}]) ->
                               {ok, [{{brod:topic(), brod:partition()}, brod:offset()}]}.

Called by group coordinator when initializing the assignments for subscriber. NOTE: this function is called only when it is DISABLED to commit offsets to kafka. i.e. offset_commit_policy is set to consumer_managed

handle_call(Call, From, State0)

handle_cast(Cast, State)

handle_info(Info, State)

init(_)

run(ClientId, GroupInput)

Force commit offsets.

start_link(Client, GroupInput)

-spec start_link(brod:client(), group_input()) -> {ok, pid()} | {error, any()}.

Start (link) a group member. The member will try to join the consumer group and get assignments for the given topic-partitions, then commit given offsets to kafka. In case not all given partitions are assigned to it, it will terminate with an exit exception

stop(Pid)

-spec stop(pid()) -> ok.

Stop the process.

sync(Pid)

-spec sync(pid()) -> ok.

Make a call to the resetter process, the call will be blocked until offsets are committed.

terminate(Reason, State)