View Source brod_group_coordinator (brod v4.0.0)
Summary
Functions
Force commit collected (acked) offsets plus the given extra offsets immediately.
Start a kafka consumer group coordinator.
DOWN
before return.Types
-type brod_offset_commit_policy() :: commit_to_kafka_v2 | consumer_managed.
-type brod_partition_assignment_strategy() :: roundrobin_v2 | callback_implemented.
-type config() :: brod:group_config().
-type member() :: brod:group_member().
-type member_id() :: brod:group_member_id().
-type offset_commit_policy() :: brod_offset_commit_policy().
-type partition_assignment_strategy() :: brod_partition_assignment_strategy().
-type protocol_name() :: string().
-type state() :: #state{client :: brod:client(), groupId :: brod:group_id(), memberId :: member_id(), leaderId :: undefined | member_id(), generationId :: integer(), topics :: [brod:topic()], connection :: undefined | kpro:connection(), hb_ref :: undefined | {reference(), ts()}, members :: [member()], is_in_group :: boolean(), member_pid :: pid(), member_module :: module(), acked_offsets :: [{{brod:topic(), brod:partition()}, brod:offset()}], offset_commit_timer :: undefined | reference(), partition_assignment_strategy :: partition_assignment_strategy(), session_timeout_seconds :: pos_integer(), rebalance_timeout_seconds :: pos_integer(), heartbeat_rate_seconds :: pos_integer(), max_rejoin_attempts :: non_neg_integer(), rejoin_delay_seconds :: non_neg_integer(), offset_retention_seconds :: undefined | integer(), offset_commit_policy :: offset_commit_policy(), offset_commit_interval_seconds :: pos_integer(), protocol_name :: protocol_name()}.
-type ts() :: erlang:timestamp().
Functions
-spec ack(pid(), integer(), brod:topic(), brod:partition(), brod:offset()) -> ok.
-spec commit_offsets(pid(), [{{brod:topic(), brod:partition()}, brod:offset()}]) -> ok | {error, any()}.
Force commit collected (acked) offsets plus the given extra offsets immediately.
NOTE:lists:usort/1
is applied on the given extra offsets to commit, meaning if two or more offsets for the same topic-partition exist in the list, only the one closest the head of the list is kept
-spec start_link(brod:client(), brod:group_id(), [brod:topic()], config(), module(), pid()) -> {ok, pid()} | {error, any()}.
Start a kafka consumer group coordinator.
Client
: ClientId
(or pid, but not recommended)
GroupId
: Predefined globally unique (in a kafka cluster) binary string.
Topics
: Predefined set of topic names to join the group.
CbModule
: The module which implements group coordinator callbacks
MemberPid
: The member process pid.
Config
: The group coordinator configs in a proplist, possible entries:
partition_assignment_strategy
(optional, default =roundrobin_v2
)Possible values:
Take all topic-offset (sortedroundrobin_v2
(topic-sticky)topic_partition()
list), assign one to each member in a roundrobin fashion. Only partitions in the subscription topic list are assigned.
Callcallback_implemented
CbModule:assign_partitions/2
to assign partitions.
Time in seconds for the group coordinator broker to consider a member 'down' if no heartbeat or any kind of requests received from a broker in the past N seconds. A group member may also consider the coordinator broker 'down' if no heartbeat response response received in the past N seconds.session_timeout_seconds
(optional, default = 30)
Time in seconds for each worker to join the group once a rebalance has begun. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures.rebalance_timeout_seconds
(optional, default = 30)
Time in seconds for the member to 'ping' the group coordinator. OBS: Care should be taken when picking the number, on one hand, we do not want to flush the broker with requests if we set it too low, on the other hand, if set it too high, it may take too long for the members to realise status changes of the group such as assignment rebalacing or group coordinator switchover etc.heartbeat_rate_seconds
(optional, default = 5)
Maximum number of times allowed for a member to re-join the group. The gen_server will stop if it reached the maximum number of retries. OBS: 'let it crash' may not be the optimal strategy here because the group member id is kept in the gen_server looping state and it is reused when re-joining the group.max_rejoin_attempts
(optional, default = 5)
Delay in seconds before re-joining the group.rejoin_delay_seconds
(optional, default = 1)
How/where to commit offsets, possible values:offset_commit_policy
(optional, default =commit_to_kafka_v2
)commit_to_kafka_v2
: Group coordinator will commit the offsets to kafka using version 2 OffsetCommitRequest.consumer_managed
: The group member (e.g. brod_group_subscriber.erl) is responsible for persisting offsets to a local or centralized storage. And the callbackget_committed_offsets
should be implemented to allow group coordinator to retrieve the committed offsets.
The time interval between two OffsetCommitRequest messages. This config is irrelevant ifoffset_commit_interval_seconds
(optional, default = 5)offset_commit_policy
isconsumer_managed
.
How long the time is to be kept in kafka before it is deleted. The default special value -1 indicates that the __consumer_offsets topic retention policy is used. This config is irrelevant ifoffset_retention_seconds
(optional, default = -1)offset_commit_policy
isconsumer_managed
.
This is the protocol name used when join a group, if not given, by defaultprotocol_name
(optional, default = roundrobin_v2)partition_assignment_strategy
is used as the protocol name. Setting a protocol name allows to interact with consumer group members designed in other programming languages. For example, 'range' is the most commonly used protocol name for JAVA client. However, brod only supports roundrobin protocol out of the box, in order to mimic 'range' protocol one will have to do it viacallback_implemented
assignment strategy
-spec stop(pid()) -> ok.
DOWN
before return.
-spec update_topics(pid(), [brod:topic()]) -> ok.