View Source brod_client (brod v4.3.2)

A brod_client in brod is a gen_server responsible for establishing and maintaining tcp sockets connecting to kafka brokers. It also manages per-topic-partition producer and consumer processes under two-level supervision trees.

You can start clients automatically at application startup or on demand. See the overview for examples.

Summary

Functions

De-register the consumer for a partition.

De-register the producer for a partition.

Get connection to a kafka broker.

Get consumer of the given topic-parition.
Get broker endpoint and connection config for connecting a group coordinator.

Get the connection to kafka broker which is a leader for given Topic-Partition.

Get topic metadata, if topic is undefined, will fetch ALL metadata.
Ensure not topic auto creation even if Kafka has it enabled.
Get number of partitions for a given topic.
Get number of partitions for an existing topic. Ensured not to auto create a topic even when Kafka is configured with topic auto creation enabled.

Get producer of the given topic-partition.

Get broker endpoint and connection config for connecting a transactional coordinator.

Register self() as a partition consumer.

Register self() as a partition producer.

Dynamically start a topic consumer.

Dynamically start a per-topic producer.

Stop all partition consumers of the given topic.
Stop all partition producers of the given topic.

Types

-type client() :: brod:client().
-type client_id() :: brod:client_id().
-type config() :: proplists:proplist().
-type conn_state() :: #conn{endpoint :: endpoint(), pid :: connection() | dead_conn()}.
-type connection() :: kpro:connection().
-type dead_conn() :: {dead_since, erlang:timestamp(), any()}.
-type endpoint() :: brod:endpoint().
-type get_consumer_error() ::
          client_down |
          {client_down, any()} |
          {consumer_down, any()} |
          {consumer_not_found, topic()} |
          {consumer_not_found, topic(), partition()}.
-type get_producer_error() ::
          client_down |
          {client_down, any()} |
          {producer_down, any()} |
          {producer_not_found, topic()} |
          {producer_not_found, topic(), partition()}.
-type get_worker_error() :: get_producer_error() | get_consumer_error().
-type group_id() :: brod:group_id().
-type partition() :: brod:partition().
Link to this type

partition_worker_key/0

View Source
-type partition_worker_key() :: {producer, topic(), partition()} | {consumer, topic(), partition()}.
-type state() ::
          #state{client_id :: client_id(),
                 bootstrap_endpoints :: [endpoint()],
                 meta_conn :: undefined | connection(),
                 payload_conns :: [conn_state()],
                 producers_sup :: undefined | pid(),
                 consumers_sup :: undefined | pid(),
                 config :: undefined | config(),
                 workers_tab :: undefined | ets:tab()}.
-type topic() :: brod:topic().
-type transactional_id() :: brod:transactional_id().

Functions

Link to this function

deregister_consumer(Client, Topic, Partition)

View Source
-spec deregister_consumer(client(), topic(), partition()) -> ok.

De-register the consumer for a partition.

The partition consumer entry is deleted from the ETS table to allow cleanup of purposefully %% stopped consumers and allow later restart.
Link to this function

deregister_producer(Client, Topic, Partition)

View Source
-spec deregister_producer(client(), topic(), partition()) -> ok.

De-register the producer for a partition.

The partition producer entry is deleted from the ETS table to allow cleanup of purposefully stopped producers and allow later restart.
Link to this function

find_consumer(Client, Topic, Partition)

View Source
-spec find_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, get_consumer_error()}.
Link to this function

find_producer(Client, Topic, Partition)

View Source
-spec find_producer(client(), topic(), partition()) -> {ok, pid()} | {error, get_producer_error()}.
-spec get_bootstrap(client()) -> {ok, brod:bootstrap()} | {error, any()}.
Link to this function

get_connection(Client, Host, Port)

View Source
-spec get_connection(client(), brod:hostname(), brod:portnum()) -> {ok, pid()} | {error, any()}.

Get connection to a kafka broker.

Return already established connection towards the broker, otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago, {error, LastReason} is returned.
Link to this function

get_consumer(Client, Topic, Partition)

View Source
-spec get_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, get_consumer_error()}.
Get consumer of the given topic-parition.
Link to this function

get_group_coordinator(Client, GroupId)

View Source
-spec get_group_coordinator(client(), group_id()) ->
                               {ok, {endpoint(), brod:conn_config()}} | {error, any()}.
Get broker endpoint and connection config for connecting a group coordinator.
Link to this function

get_leader_connection(Client, Topic, Partition)

View Source
-spec get_leader_connection(client(), topic(), partition()) -> {ok, pid()} | {error, any()}.

Get the connection to kafka broker which is a leader for given Topic-Partition.

Return already established connection towards the leader broker, Otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago, {error, LastReason} is returned.
Link to this function

get_metadata(Client, Topic)

View Source
-spec get_metadata(client(), all | undefined | topic()) -> {ok, kpro:struct()} | {error, any()}.
Get topic metadata, if topic is undefined, will fetch ALL metadata.
Link to this function

get_metadata_safe(Client, Topic)

View Source
-spec get_metadata_safe(client(), topic()) -> {ok, kpro:struct()} | {error, any()}.
Ensure not topic auto creation even if Kafka has it enabled.
Link to this function

get_partitions_count(Client, Topic)

View Source
-spec get_partitions_count(client(), topic()) -> {ok, pos_integer()} | {error, any()}.
Get number of partitions for a given topic.
Link to this function

get_partitions_count_safe(Client, Topic)

View Source
-spec get_partitions_count_safe(client(), topic()) -> {ok, pos_integer()} | {error, any()}.
Get number of partitions for an existing topic. Ensured not to auto create a topic even when Kafka is configured with topic auto creation enabled.
Link to this function

get_producer(Client, Topic, Partition)

View Source
-spec get_producer(client(), topic(), partition()) -> {ok, pid()} | {error, get_producer_error()}.

Get producer of the given topic-partition.

The producer is started if auto_start_producers is enabled in client config.
Link to this function

get_transactional_coordinator(Client, TransactionId)

View Source
-spec get_transactional_coordinator(client(), transactional_id()) ->
                                       {ok, {endpoint(), brod:conn_config()}} | {error, any()}.
Get broker endpoint and connection config for connecting a transactional coordinator.
Link to this function

lookup_partitions_count_cache(Ets, Topic)

View Source
-spec lookup_partitions_count_cache(ets:tab(), undefined | topic()) ->
                                       {ok, pos_integer()} | {error, any()} | false.
Link to this function

register_consumer(Client, Topic, Partition)

View Source
-spec register_consumer(client(), topic(), partition()) -> ok.

Register self() as a partition consumer.

The pid is registered in an ETS table, then the callers may lookup a consumer pid from the table and make subscribe calls to the process directly.
Link to this function

register_producer(Client, Topic, Partition)

View Source
-spec register_producer(client(), topic(), partition()) -> ok.

Register self() as a partition producer.

The pid is registered in an ETS table, then the callers may lookup a producer pid from the table and make produce requests to the producer process directly.
Link to this function

start_consumer(Client, TopicName, ConsumerConfig)

View Source
-spec start_consumer(client(), topic(), brod:consumer_config()) -> ok | {error, any()}.

Dynamically start a topic consumer.

Returns ok if the consumer is already started.
Link to this function

start_link(BootstrapEndpoints, ClientId, Config)

View Source
-spec start_link([endpoint()], client_id(), config()) -> {ok, pid()} | {error, any()}.
Link to this function

start_producer(Client, TopicName, ProducerConfig)

View Source
-spec start_producer(client(), topic(), brod:producer_config()) -> ok | {error, any()}.

Dynamically start a per-topic producer.

Return ok if the producer is already started.
-spec stop(client()) -> ok.
Link to this function

stop_consumer(Client, TopicName)

View Source
-spec stop_consumer(client(), topic()) -> ok | {error, any()}.
Stop all partition consumers of the given topic.
Link to this function

stop_producer(Client, TopicName)

View Source
-spec stop_producer(client(), topic()) -> ok | {error, any()}.
Stop all partition producers of the given topic.