View Source brod_group_coordinator (brod v4.0.0)

Summary

Functions

For group member to call to acknowledge a consumed message offset.
Force commit collected (acked) offsets immediately.

Force commit collected (acked) offsets plus the given extra offsets immediately.

Start a kafka consumer group coordinator.

Stop group coordinator, wait for pid DOWN before return.
Update the list of topics the brod_group_coordinator follow which triggers a join group rebalance

Types

Link to this type

brod_offset_commit_policy/0

View Source
-type brod_offset_commit_policy() :: commit_to_kafka_v2 | consumer_managed.
Link to this type

brod_partition_assignment_strategy/0

View Source
-type brod_partition_assignment_strategy() :: roundrobin_v2 | callback_implemented.
-type config() :: brod:group_config().
-type member() :: brod:group_member().
-type member_id() :: brod:group_member_id().
Link to this type

offset_commit_policy/0

View Source
-type offset_commit_policy() :: brod_offset_commit_policy().
Link to this type

partition_assignment_strategy/0

View Source
-type partition_assignment_strategy() :: brod_partition_assignment_strategy().
-type protocol_name() :: string().
-type state() ::
          #state{client :: brod:client(),
                 groupId :: brod:group_id(),
                 memberId :: member_id(),
                 leaderId :: undefined | member_id(),
                 generationId :: integer(),
                 topics :: [brod:topic()],
                 connection :: undefined | kpro:connection(),
                 hb_ref :: undefined | {reference(), ts()},
                 members :: [member()],
                 is_in_group :: boolean(),
                 member_pid :: pid(),
                 member_module :: module(),
                 acked_offsets :: [{{brod:topic(), brod:partition()}, brod:offset()}],
                 offset_commit_timer :: undefined | reference(),
                 partition_assignment_strategy :: partition_assignment_strategy(),
                 session_timeout_seconds :: pos_integer(),
                 rebalance_timeout_seconds :: pos_integer(),
                 heartbeat_rate_seconds :: pos_integer(),
                 max_rejoin_attempts :: non_neg_integer(),
                 rejoin_delay_seconds :: non_neg_integer(),
                 offset_retention_seconds :: undefined | integer(),
                 offset_commit_policy :: offset_commit_policy(),
                 offset_commit_interval_seconds :: pos_integer(),
                 protocol_name :: protocol_name()}.
-type ts() :: erlang:timestamp().

Functions

Link to this function

ack(Pid, GenerationId, Topic, Partition, Offset)

View Source
-spec ack(pid(), integer(), brod:topic(), brod:partition(), brod:offset()) -> ok.
For group member to call to acknowledge a consumed message offset.
Link to this function

code_change(OldVsn, State, Extra)

View Source
Link to this function

commit_offsets(CoordinatorPid)

View Source
-spec commit_offsets(pid()) -> ok | {error, any()}.
Force commit collected (acked) offsets immediately.
Link to this function

commit_offsets(CoordinatorPid, Offsets0)

View Source
-spec commit_offsets(pid(), [{{brod:topic(), brod:partition()}, brod:offset()}]) -> ok | {error, any()}.

Force commit collected (acked) offsets plus the given extra offsets immediately.

NOTE: lists:usort/1 is applied on the given extra offsets to commit, meaning if two or more offsets for the same topic-partition exist in the list, only the one closest the head of the list is kept
Link to this function

handle_call(Call, From, State)

View Source
Link to this function

handle_cast(Cast, State)

View Source
Link to this function

handle_info(Info, State)

View Source
Link to this function

start_link(Client, GroupId, Topics, Config, CbModule, MemberPid)

View Source
-spec start_link(brod:client(), brod:group_id(), [brod:topic()], config(), module(), pid()) ->
                    {ok, pid()} | {error, any()}.

Start a kafka consumer group coordinator.

Client: ClientId (or pid, but not recommended)

GroupId: Predefined globally unique (in a kafka cluster) binary string.

Topics: Predefined set of topic names to join the group.

CbModule: The module which implements group coordinator callbacks

MemberPid: The member process pid.

Config: The group coordinator configs in a proplist, possible entries:

  • partition_assignment_strategy (optional, default = roundrobin_v2)

    Possible values:

    • roundrobin_v2 (topic-sticky)

      Take all topic-offset (sorted topic_partition() list), assign one to each member in a roundrobin fashion. Only partitions in the subscription topic list are assigned.
    • callback_implemented

      Call CbModule:assign_partitions/2 to assign partitions.
  • session_timeout_seconds (optional, default = 30)

    Time in seconds for the group coordinator broker to consider a member 'down' if no heartbeat or any kind of requests received from a broker in the past N seconds. A group member may also consider the coordinator broker 'down' if no heartbeat response response received in the past N seconds.
  • rebalance_timeout_seconds (optional, default = 30)

    Time in seconds for each worker to join the group once a rebalance has begun. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures.
  • heartbeat_rate_seconds (optional, default = 5)

    Time in seconds for the member to 'ping' the group coordinator. OBS: Care should be taken when picking the number, on one hand, we do not want to flush the broker with requests if we set it too low, on the other hand, if set it too high, it may take too long for the members to realise status changes of the group such as assignment rebalacing or group coordinator switchover etc.
  • max_rejoin_attempts (optional, default = 5)

    Maximum number of times allowed for a member to re-join the group. The gen_server will stop if it reached the maximum number of retries. OBS: 'let it crash' may not be the optimal strategy here because the group member id is kept in the gen_server looping state and it is reused when re-joining the group.
  • rejoin_delay_seconds (optional, default = 1)

    Delay in seconds before re-joining the group.
  • offset_commit_policy (optional, default = commit_to_kafka_v2)

    How/where to commit offsets, possible values:
    • commit_to_kafka_v2: Group coordinator will commit the offsets to kafka using version 2 OffsetCommitRequest.
    • consumer_managed: The group member (e.g. brod_group_subscriber.erl) is responsible for persisting offsets to a local or centralized storage. And the callback get_committed_offsets should be implemented to allow group coordinator to retrieve the committed offsets.
  • offset_commit_interval_seconds (optional, default = 5)

    The time interval between two OffsetCommitRequest messages. This config is irrelevant if offset_commit_policy is consumer_managed.
  • offset_retention_seconds (optional, default = -1)

    How long the time is to be kept in kafka before it is deleted. The default special value -1 indicates that the __consumer_offsets topic retention policy is used. This config is irrelevant if offset_commit_policy is consumer_managed.
  • protocol_name (optional, default = roundrobin_v2)

    This is the protocol name used when join a group, if not given, by default partition_assignment_strategy is used as the protocol name. Setting a protocol name allows to interact with consumer group members designed in other programming languages. For example, 'range' is the most commonly used protocol name for JAVA client. However, brod only supports roundrobin protocol out of the box, in order to mimic 'range' protocol one will have to do it via callback_implemented assignment strategy
-spec stop(pid()) -> ok.
Stop group coordinator, wait for pid DOWN before return.
Link to this function

terminate(Reason, State)

View Source
Link to this function

update_topics(CoordinatorPid, Topics)

View Source
-spec update_topics(pid(), [brod:topic()]) -> ok.
Update the list of topics the brod_group_coordinator follow which triggers a join group rebalance