View Source brod_client (brod v4.4.0)

A brod_client in brod is a gen_server responsible for establishing and maintaining tcp sockets connecting to kafka brokers. It also manages per-topic-partition producer and consumer processes under two-level supervision trees.

You can start clients automatically at application startup or on demand. See the overview for examples.

Summary

Functions

De-register the consumer for a partition.

De-register the producer for a partition.

Get connection to a kafka broker.

Get consumer of the given topic-partition.

Get broker endpoint and connection config for connecting a group coordinator.

Get the connection to kafka broker which is a leader for given Topic-Partition.

Get topic metadata, if topic is undefined, will fetch ALL metadata.

Ensure not topic auto creation even if Kafka has it enabled.

Get number of partitions for a given topic.

Get number of partitions for an existing topic. Ensured not to auto create a topic even when Kafka is configured with topic auto creation enabled.

Get producer of the given topic-partition.

Get broker endpoint and connection config for connecting a transactional coordinator.

Register self() as a partition consumer.

Register self() as a partition producer.

Dynamically start a topic consumer.

Dynamically start a per-topic producer.

Stop all partition consumers of the given topic.

Stop all partition producers of the given topic.

Types

client/0

-type client() :: brod:client().

client_id/0

-type client_id() :: brod:client_id().

config/0

-type config() :: proplists:proplist().

conn_state/0

-type conn_state() :: #conn{endpoint :: endpoint(), pid :: connection() | dead_conn()}.

connection/0

-type connection() :: kpro:connection().

dead_conn/0

-type dead_conn() :: {dead_since, erlang:timestamp(), any()}.

endpoint/0

-type endpoint() :: brod:endpoint().

get_consumer_error/0

-type get_consumer_error() ::
          client_down |
          {client_down, any()} |
          {consumer_down, any()} |
          {consumer_not_found, topic()} |
          {consumer_not_found, topic(), partition()}.

get_producer_error/0

-type get_producer_error() ::
          client_down |
          {client_down, any()} |
          {producer_down, any()} |
          {producer_not_found, topic()} |
          {producer_not_found, topic(), partition()}.

get_worker_error/0

-type get_worker_error() :: get_producer_error() | get_consumer_error().

group_id/0

-type group_id() :: brod:group_id().

partition/0

-type partition() :: brod:partition().

partition_worker_key/0

-type partition_worker_key() :: {producer, topic(), partition()} | {consumer, topic(), partition()}.

state/0

-type state() ::
          #state{client_id :: client_id(),
                 bootstrap_endpoints :: [endpoint()],
                 meta_conn :: undefined | connection(),
                 payload_conns :: [conn_state()],
                 producers_sup :: undefined | pid(),
                 consumers_sup :: undefined | pid(),
                 config :: undefined | config(),
                 workers_tab :: undefined | ets:tab()}.

topic/0

-type topic() :: brod:topic().

transactional_id/0

-type transactional_id() :: brod:transactional_id().

Functions

deregister_consumer(Client, Topic, Partition)

-spec deregister_consumer(client(), topic(), partition()) -> ok.

De-register the consumer for a partition.

The partition consumer entry is deleted from the ETS table to allow cleanup of purposefully %% stopped consumers and allow later restart.

deregister_producer(Client, Topic, Partition)

-spec deregister_producer(client(), topic(), partition()) -> ok.

De-register the producer for a partition.

The partition producer entry is deleted from the ETS table to allow cleanup of purposefully stopped producers and allow later restart.

find_consumer(Client, Topic, Partition)

-spec find_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, get_consumer_error()}.

find_producer(Client, Topic, Partition)

-spec find_producer(client(), topic(), partition()) -> {ok, pid()} | {error, get_producer_error()}.

get_bootstrap(Client)

-spec get_bootstrap(client()) -> {ok, brod:bootstrap()} | {error, any()}.

get_connection(Client, Host, Port)

-spec get_connection(client(), brod:hostname(), brod:portnum()) -> {ok, pid()} | {error, any()}.

Get connection to a kafka broker.

Return already established connection towards the broker, otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago, {error, LastReason} is returned.

get_consumer(Client, Topic, Partition)

-spec get_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, get_consumer_error()}.

Get consumer of the given topic-partition.

get_group_coordinator(Client, GroupId)

-spec get_group_coordinator(client(), group_id()) ->
                               {ok, {endpoint(), brod:conn_config()}} | {error, any()}.

Get broker endpoint and connection config for connecting a group coordinator.

get_leader_connection(Client, Topic, Partition)

-spec get_leader_connection(client(), topic(), partition()) -> {ok, pid()} | {error, any()}.

Get the connection to kafka broker which is a leader for given Topic-Partition.

Return already established connection towards the leader broker, Otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago, {error, LastReason} is returned.

get_metadata(Client, Topic)

-spec get_metadata(client(), all | undefined | topic()) -> {ok, kpro:struct()} | {error, any()}.

Get topic metadata, if topic is undefined, will fetch ALL metadata.

get_metadata_safe(Client, Topic)

-spec get_metadata_safe(client(), topic()) -> {ok, kpro:struct()} | {error, any()}.

Ensure not topic auto creation even if Kafka has it enabled.

get_partitions_count(Client, Topic)

-spec get_partitions_count(client(), topic()) -> {ok, pos_integer()} | {error, any()}.

Get number of partitions for a given topic.

get_partitions_count_safe(Client, Topic)

-spec get_partitions_count_safe(client(), topic()) -> {ok, pos_integer()} | {error, any()}.

Get number of partitions for an existing topic. Ensured not to auto create a topic even when Kafka is configured with topic auto creation enabled.

get_producer(Client, Topic, Partition)

-spec get_producer(client(), topic(), partition()) -> {ok, pid()} | {error, get_producer_error()}.

Get producer of the given topic-partition.

The producer is started if auto_start_producers is enabled in client config.

get_transactional_coordinator(Client, TransactionId)

-spec get_transactional_coordinator(client(), transactional_id()) ->
                                       {ok, {endpoint(), brod:conn_config()}} | {error, any()}.

Get broker endpoint and connection config for connecting a transactional coordinator.

lookup_partitions_count_cache(Ets, Topic)

-spec lookup_partitions_count_cache(ets:tab(), undefined | topic()) ->
                                       {ok, pos_integer()} | {error, any()} | false.

register_consumer(Client, Topic, Partition)

-spec register_consumer(client(), topic(), partition()) -> ok.

Register self() as a partition consumer.

The pid is registered in an ETS table, then the callers may lookup a consumer pid from the table and make subscribe calls to the process directly.

register_producer(Client, Topic, Partition)

-spec register_producer(client(), topic(), partition()) -> ok.

Register self() as a partition producer.

The pid is registered in an ETS table, then the callers may lookup a producer pid from the table and make produce requests to the producer process directly.

start_consumer(Client, TopicName, ConsumerConfig)

-spec start_consumer(client(), topic(), brod:consumer_config()) -> ok | {error, any()}.

Dynamically start a topic consumer.

Returns ok if the consumer is already started.

start_link(BootstrapEndpoints, ClientId, Config)

-spec start_link([endpoint()], client_id(), config()) -> {ok, pid()} | {error, any()}.

start_producer(Client, TopicName, ProducerConfig)

-spec start_producer(client(), topic(), brod:producer_config()) -> ok | {error, any()}.

Dynamically start a per-topic producer.

Return ok if the producer is already started.

stop(Client)

-spec stop(client()) -> ok.

stop_consumer(Client, TopicName)

-spec stop_consumer(client(), topic()) -> ok | {error, any()}.

Stop all partition consumers of the given topic.

stop_producer(Client, TopicName)

-spec stop_producer(client(), topic()) -> ok | {error, any()}.

Stop all partition producers of the given topic.