View Source brod_client (brod v4.0.0)
A brod_client
in brod is a gen_server
responsible for establishing and maintaining tcp sockets connecting to kafka brokers. It also manages per-topic-partition producer and consumer processes under two-level supervision trees.
Summary
Functions
De-register the consumer for a partition.
De-register the producer for a partition.
Get connection to a kafka broker.
Get the connection to kafka broker which is a leader for given Topic-Partition.
undefined
, will fetch ALL metadata.Get producer of the given topic-partition.
Register self() as a partition consumer.
Register self() as a partition producer.
Dynamically start a topic consumer.
Dynamically start a per-topic producer.
Types
-type client() :: brod:client().
-type client_id() :: brod:client_id().
-type config() :: proplists:proplist().
-type conn_state() :: #conn{endpoint :: endpoint(), pid :: connection() | dead_conn()}.
-type connection() :: kpro:connection().
-type dead_conn() :: {dead_since, erlang:timestamp(), any()}.
-type endpoint() :: brod:endpoint().
-type get_worker_error() :: get_producer_error() | get_consumer_error().
-type group_id() :: brod:group_id().
-type partition() :: brod:partition().
-type state() :: #state{client_id :: client_id(), bootstrap_endpoints :: [endpoint()], meta_conn :: undefined | connection(), payload_conns :: [conn_state()], producers_sup :: undefined | pid(), consumers_sup :: undefined | pid(), config :: undefined | config(), workers_tab :: undefined | ets:tab()}.
-type topic() :: brod:topic().
-type transactional_id() :: brod:transactional_id().
Functions
De-register the consumer for a partition.
The partition consumer entry is deleted from the ETS table to allow cleanup of purposefully %% stopped consumers and allow later restart.De-register the producer for a partition.
The partition producer entry is deleted from the ETS table to allow cleanup of purposefully stopped producers and allow later restart.-spec find_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, get_consumer_error()}.
-spec find_producer(client(), topic(), partition()) -> {ok, pid()} | {error, get_producer_error()}.
-spec get_connection(client(), brod:hostname(), brod:portnum()) -> {ok, pid()} | {error, any()}.
Get connection to a kafka broker.
Return already established connection towards the broker, otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago,{error, LastReason}
is returned.
-spec get_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, get_consumer_error()}.
-spec get_group_coordinator(client(), group_id()) -> {ok, {endpoint(), brod:conn_config()}} | {error, any()}.
Get the connection to kafka broker which is a leader for given Topic-Partition.
Return already established connection towards the leader broker, Otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago,{error, LastReason}
is returned.
-spec get_metadata(client(), all | undefined | topic()) -> {ok, kpro:struct()} | {error, any()}.
undefined
, will fetch ALL metadata.
-spec get_metadata_safe(client(), topic()) -> {ok, kpro:struct()} | {error, any()}.
-spec get_partitions_count(client(), topic()) -> {ok, pos_integer()} | {error, any()}.
-spec get_partitions_count_safe(client(), topic()) -> {ok, pos_integer()} | {error, any()}.
-spec get_producer(client(), topic(), partition()) -> {ok, pid()} | {error, get_producer_error()}.
Get producer of the given topic-partition.
The producer is started if auto_start_producers is enabled in client config.-spec get_transactional_coordinator(client(), transactional_id()) -> {ok, {endpoint(), brod:conn_config()}} | {error, any()}.
-spec lookup_partitions_count_cache(ets:tab(), undefined | topic()) -> {ok, pos_integer()} | {error, any()} | false.
Register self() as a partition consumer.
The pid is registered in an ETS table, then the callers may lookup a consumer pid from the table and make subscribe calls to the process directly.Register self() as a partition producer.
The pid is registered in an ETS table, then the callers may lookup a producer pid from the table and make produce requests to the producer process directly.-spec start_consumer(client(), topic(), brod:consumer_config()) -> ok | {error, any()}.
Dynamically start a topic consumer.
Returns ok if the consumer is already started.-spec start_producer(client(), topic(), brod:producer_config()) -> ok | {error, any()}.
Dynamically start a per-topic producer.
Return ok if the producer is already started.-spec stop(client()) -> ok.