View Source brod_client (brod v3.17.0)
A brod_client
in brod is a gen_server
responsible for establishing and maintaining tcp sockets connecting to kafka brokers. It also manages per-topic-partition producer and consumer processes under two-level supervision trees.
Link to this section Summary
Functions
De-register the consumer for a partition.
De-register the producer for a partition.
Get connection to a kafka broker.
Get the connection to kafka broker which is a leader for given Topic-Partition.
undefined
, will fetch ALL metadata.Get producer of the given topic-partition.
Register self() as a partition consumer.
Register self() as a partition producer.
Dynamically start a topic consumer.
Dynamically start a per-topic producer.
Link to this section Types
-type client() :: brod:client().
-type client_id() :: brod:client_id().
-type config() :: proplists:proplist().
-type conn_state() :: #conn{}.
-type connection() :: kpro:connection().
-type dead_conn() :: {dead_since, erlang:timestamp(), any()}.
-type endpoint() :: brod:endpoint().
-type get_worker_error() :: get_producer_error() | get_consumer_error().
-type group_id() :: brod:group_id().
-type partition() :: brod:partition().
-type state() :: #state{}.
-type topic() :: brod:topic().
Link to this section Functions
De-register the consumer for a partition.
The partition consumer entry is deleted from the ETS table to allow cleanup of purposefully %% stopped consumers and allow later restart.De-register the producer for a partition.
The partition producer entry is deleted from the ETS table to allow cleanup of purposefully stopped producers and allow later restart.-spec find_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, get_consumer_error()}.
-spec find_producer(client(), topic(), partition()) -> {ok, pid()} | {error, get_producer_error()}.
-spec get_connection(client(), brod:hostname(), brod:portnum()) -> {ok, pid()} | {error, any()}.
Get connection to a kafka broker.
Return already established connection towards the broker, otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago,{error, LastReason}
is returned.
-spec get_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, get_consumer_error()}.
-spec get_group_coordinator(client(), group_id()) -> {ok, {endpoint(), brod:conn_config()}} | {error, any()}.
Get the connection to kafka broker which is a leader for given Topic-Partition.
Return already established connection towards the leader broker, Otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago,{error, LastReason}
is returned.
-spec get_metadata(client(), all | undefined | topic()) -> {ok, kpro:struct()} | {error, any()}.
undefined
, will fetch ALL metadata.
-spec get_metadata_safe(client(), topic()) -> {ok, kpro:struct()} | {error, any()}.
-spec get_producer(client(), topic(), partition()) -> {ok, pid()} | {error, get_producer_error()}.
Get producer of the given topic-partition.
The producer is started if auto_start_producers is enabled in client config.Register self() as a partition consumer.
The pid is registered in an ETS table, then the callers may lookup a consumer pid from the table and make subscribe calls to the process directly.Register self() as a partition producer.
The pid is registered in an ETS table, then the callers may lookup a producer pid from the table and make produce requests to the producer process directly.-spec start_consumer(client(), topic(), brod:consumer_config()) -> ok | {error, any()}.
Dynamically start a topic consumer.
Returns ok if the consumer is already started.-spec start_producer(client(), topic(), brod:producer_config()) -> ok | {error, any()}.
Dynamically start a per-topic producer.
Return ok if the producer is already started.-spec stop(client()) -> ok.