Behaviours: gen_server.
config() = brod:group_config()
protocol_name() = string()
ack/5 | For group member to call to acknowledge a consumed message offset. |
code_change/3 | |
commit_offsets/1 | Force commit collected (acked) offsets immediately. |
commit_offsets/2 | Force commit collected (acked) offsets plus the given extra offsets immediately. |
handle_call/3 | |
handle_cast/2 | |
handle_info/2 | |
init/1 | |
start_link/6 | Start a kafka consumer group coordinator. |
terminate/2 | |
update_topics/2 | Update the list of topics the brod_group_coordinator follow which triggers a join group rebalance. |
ack(Pid::pid(), GenerationId::integer(), Topic::brod:topic(), Partition::brod:partition(), Offset::brod:offset()) -> ok
For group member to call to acknowledge a consumed message offset.
code_change(OldVsn, State, Extra) -> any()
commit_offsets(CoordinatorPid::pid()) -> ok | {error, any()}
Force commit collected (acked) offsets immediately.
commit_offsets(CoordinatorPid::pid(), Offsets0::[{{brod:topic(), brod:partition()}, brod:offset()}]) -> ok | {error, any()}
Force commit collected (acked) offsets plus the given extra offsets immediately.
NOTE:lists:usort/1
is applied on the given extra offsets to commit,
meaning if two or more offsets for the same topic-partition exist
in the list, only the one closest the head of the list is kept
handle_call(Call, From, State) -> any()
handle_cast(Cast, State) -> any()
handle_info(Info, State) -> any()
init(X1) -> any()
start_link(Client::brod:client(), GroupId::brod:group_id(), Topics::[brod:topic()], Config::config(), CbModule::module(), MemberPid::pid()) -> {ok, pid()} | {error, any()}
Start a kafka consumer group coordinator.
Client
: ClientId
(or pid, but not recommended)
GroupId
: Predefined globally unique (in a kafka cluster) binary string.
Topics
: Predefined set of topic names to join the group.
CbModule
: The module which implements group coordinator callbacks
MemberPid
: The member process pid.
Config
: The group coordinator configs in a proplist, possible entries:
partition_assignment_strategy
(optional, default =
roundrobin_v2
)
Possible values:
roundrobin_v2
(topic-sticky)
topic_partition()
list),
assign one to each member in a roundrobin fashion. Only
partitions in the subscription topic list are assiged.callback_implemented
CbModule:assign_partitions/2
to assign
partitions.session_timeout_seconds
(optional, default = 10)
heartbeat_rate_seconds
(optional, default = 2)
max_rejoin_attempts
(optional, default = 5)
rejoin_delay_seconds
(optional, default = 1)
offset_commit_policy
(optional, default = commit_to_kafka_v2
)
commit_to_kafka_v2
: Group coordinator will commit the
offsets to kafka using version 2
OffsetCommitRequest.consumer_managed
: The group member
(e.g. brod_group_subscriber.erl) is responsible for
persisting offsets to a local or centralized storage.
And the callback get_committed_offsets
should be
implemented to allow group coordinator to retrieve the
commited offsets.offset_commit_interval_seconds
(optional, default = 5)
offset_commit_policy
is
consumer_managed
.offset_retention_seconds
(optional, default = -1)
offset_commit_policy
is
consumer_managed
.protocol_name
(optional, default = roundrobin_v2)
partition_assignment_strategy
is used as the protocol name.
Setting a protocol name allows to interact with consumer group members
designed in other programing languages. For example, 'range' is the most
commonly used protocol name for JAVA client. However, brod only supports
roundrobin protocol out of the box, in order to mimic 'range' protocol
one will have to do it via callback_implemented
assignment strategy
terminate(Reason, State) -> any()
update_topics(CoordinatorPid::pid(), Topics::[brod:topic()]) -> ok
Update the list of topics the brod_group_coordinator follow which triggers a join group rebalance
Generated by EDoc