View Source KafkaEx.Cluster.ClusterMetadata (kafka_ex v1.0.0-rc.1)

Encapsulates what we know about the state of a Kafka broker cluster

This module is mainly used internally in New.Client, but some of its functions may be useful for extracting metadata information

Summary

Types

Possible errors given by select_node/2

t()

Functions

Returns a t:Broker.t/0 for the given t:KafkaExAPI.node_id/0 or nil if there is no known broker with that node id

Return a list of the known brokers

List names of topics known by the cluster metadata

Merge two sets of cluster metadata with their brokers. Returns the merged metadata and a list of brokers that should have their connections closed because they are not present in the new metadata

remove the given topics (e.g., when they are deleted)

Find the node id for a given selector

Return the metadata for the given topics

Execute an update callback on each broker

Types

@type node_select_error() ::
  :no_such_node | :no_such_topic | :no_such_partition | :no_such_consumer_group

Possible errors given by select_node/2

@type t() :: %KafkaEx.Cluster.ClusterMetadata{
  brokers: %{required(KafkaEx.API.node_id()) => KafkaEx.Cluster.Broker.t()},
  consumer_group_coordinators: %{
    required(KafkaEx.API.consumer_group_name()) => KafkaEx.API.node_id()
  },
  controller_id: KafkaEx.API.node_id() | nil,
  topics: %{required(KafkaEx.API.topic_name()) => KafkaEx.Cluster.Topic.t()}
}

Functions

Link to this function

broker_by_node_id(cluster_metadata, node_id)

View Source
@spec broker_by_node_id(t(), KafkaEx.API.node_id()) ::
  KafkaEx.Cluster.Broker.t() | nil

Returns a t:Broker.t/0 for the given t:KafkaExAPI.node_id/0 or nil if there is no known broker with that node id

Link to this function

brokers(cluster_metadata)

View Source
@spec brokers(t()) :: [KafkaEx.Cluster.Broker.t()]

Return a list of the known brokers

Link to this function

known_topics(cluster_metadata)

View Source
@spec known_topics(t()) :: [KafkaEx.API.topic_name()]

List names of topics known by the cluster metadata

Tthis is a subset of the topics in the cluster - it will only contain topics for which we have fetched metadata

Link to this function

merge_brokers(old_cluster_metadata, new_cluster_metadata)

View Source
@spec merge_brokers(t(), t()) :: {t(), [KafkaEx.Cluster.Broker.t()]}

Merge two sets of cluster metadata with their brokers. Returns the merged metadata and a list of brokers that should have their connections closed because they are not present in the new metadata

Link to this function

put_consumer_group_coordinator(cluster_metadata, consumer_group, coordinator_node_id)

View Source
@spec put_consumer_group_coordinator(
  t(),
  KafkaEx.API.consumer_group_name(),
  KafkaEx.API.node_id()
) ::
  t()

update a consumer group coordinator node id

Link to this function

remove_topics(cluster_metadata, topics_to_remove)

View Source
@spec remove_topics(t(), [KafkaEx.API.topic_name()]) :: t()

remove the given topics (e.g., when they are deleted)

Link to this function

select_node(cluster_metadata, node_selector)

View Source
@spec select_node(t(), KafkaEx.Client.NodeSelector.t()) ::
  {:ok, KafkaEx.API.node_id()} | {:error, node_select_error()}

Find the node id for a given selector

Note this will not update the metadata, only select a node given the current metadata.

See KafkaEx.Client.NodeSelector.t/0

Link to this function

topics_metadata(cluster_metadata, wanted_topics)

View Source
@spec topics_metadata(t(), [KafkaEx.API.topic_name()]) :: [KafkaEx.Cluster.Topic.t()]

Return the metadata for the given topics

Link to this function

update_brokers(cluster_metadata, cb)

View Source
@spec update_brokers(t(), (KafkaEx.Cluster.Broker.t() -> KafkaEx.Cluster.Broker.t())) ::
  t()

Execute an update callback on each broker