Module brod_group_subscriber

A group subscriber is a gen_server which subscribes to partition consumers (poller) and calls the user-defined callback functions for message processing.

Behaviours: brod_group_member, gen_server.

This module defines the brod_group_subscriber behaviour.
Required callback functions: init/2, handle_message/4.

Description

A group subscriber is a gen_server which subscribes to partition consumers (poller) and calls the user-defined callback functions for message processing.

An overview of what it does behind the scene:
  1. Start a consumer group coordinator to manage the consumer group states, see brod_group_coordinator:start_link/6
  2. Start (if not already started) topic-consumers (pollers) and subscribe to the partition workers when group assignment is received from the group leader, see brod:start_consumer/3
  3. Call CallbackModule:handle_message/4 when messages are received from the partition consumers.
  4. Send acknowledged offsets to group coordinator which will be committed to kafka periodically.

Data Types

member_id()

member_id() = brod:group_member_id()

Function Index

ack/4Acknowledge and commit an offset.
ack/5Acknowledge an offset.
assign_partitions/3This function is called only when partition_assignment_strategy is set for callback_implemented in group config.
assignments_received/4Called by group coordinator when there is new assignment received.
assignments_revoked/1Called by group coordinator before re-joining the consumer group.
code_change/3
commit/1Commit all acked offsets.
commit/4Commit offset for a topic.
get_committed_offsets/2Called by group coordinator when initializing the assignments for subscriber.
handle_call/3
handle_cast/2
handle_info/2
init/1
start_link/7Equivalent to start_link(Client, GroupId, Topics, GroupConfig, ConsumerConfig, message, CbModule, CbInitArg).
start_link/8Start (link) a group subscriber.
stop/1Stop group subscriber, wait for pid DOWN before return.
terminate/2
user_data/1

Function Details

ack/4

ack(Pid::pid(), Topic::brod:topic(), Partition::brod:partition(), Offset::brod:offset()) -> ok

Acknowledge and commit an offset. The subscriber may ack a later (greater) offset which will be considered as multi-acking the earlier (smaller) offsets. This also means that disordered acks may overwrite offset commits and lead to unnecessary message re-delivery in case of restart.

ack/5

ack(Pid::pid(), Topic::brod:topic(), Partition::brod:partition(), Offset::brod:offset(), Commit::boolean()) -> ok

Acknowledge an offset. This call may or may not commit group subscriber offset depending on the value of Commit argument

assign_partitions/3

assign_partitions(Pid::pid(), Members::[brod:group_member()], TopicPartitionList::[{brod:topic(), brod:partition()}]) -> [{member_id(), [brod:partition_assignment()]}]

This function is called only when partition_assignment_strategy is set for callback_implemented in group config.

assignments_received/4

assignments_received(Pid::pid(), MemberId::member_id(), GenerationId::integer(), TopicAssignments::brod:received_assignments()) -> ok

Called by group coordinator when there is new assignment received.

assignments_revoked/1

assignments_revoked(Pid::pid()) -> ok

Called by group coordinator before re-joining the consumer group.

code_change/3

code_change(OldVsn, State, Extra) -> any()

commit/1

commit(Pid::pid()) -> ok

Commit all acked offsets. NOTE: This is an async call.

commit/4

commit(Pid::pid(), Topic::brod:topic(), Partition::brod:partition(), Offset::brod:offset()) -> ok

Commit offset for a topic. This is an asynchronous call

get_committed_offsets/2

get_committed_offsets(Pid::pid(), TopicPartitions::[{brod:topic(), brod:partition()}]) -> {ok, [{{brod:topic(), brod:partition()}, brod:offset()}]}

Called by group coordinator when initializing the assignments for subscriber.

NOTE: This function is called only when offset_commit_policy is set to consumer_managed in group config.

NOTE: The committed offsets should be the offsets for successfully processed (acknowledged) messages, not the begin_offset to start fetching from.

handle_call/3

handle_call(Call, From, State) -> any()

handle_cast/2

handle_cast(Cast, State) -> any()

handle_info/2

handle_info(Info, State0) -> any()

init/1

init(X1) -> any()

start_link/7

start_link(Client::brod:client(), GroupId::brod:group_id(), Topics::[brod:topic()], GroupConfig::brod:group_config(), ConsumerConfig::brod:consumer_config(), CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}

Equivalent to start_link(Client, GroupId, Topics, GroupConfig, ConsumerConfig, message, CbModule, CbInitArg).

start_link/8

start_link(Client::brod:client(), GroupId::brod:group_id(), Topics::[brod:topic()], GroupConfig::brod:group_config(), ConsumerConfig::brod:consumer_config(), MessageType::message | message_set, CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}

Start (link) a group subscriber.

Client: Client ID (or pid, but not recommended) of the brod client.

GroupId: Consumer group ID which should be unique per kafka cluster

Topics: Predefined set of topic names to join the group.

NOTE: The group leader member will collect topics from all members and assign all collected topic-partitions to members in the group. i.e. members can join with arbitrary set of topics.

GroupConfig: For group coordinator, see brod_group_coordinator:start_link/6

ConsumerConfig: For partition consumer, see brod_consumer:start_link/4

MessageType: The type of message that is going to be handled by the callback module. Can be either message or message_set.

CbModule: Callback module which should have the callback functions implemented for message processing.

CbInitArg: The term() that is going to be passed to CbModule:init/2 as a second argument when initializing the subscriber.

stop/1

stop(Pid::pid()) -> ok

Stop group subscriber, wait for pid DOWN before return.

terminate/2

terminate(Reason, State) -> any()

user_data/1

user_data(Pid) -> any()


Generated by EDoc