View Source brod_client (brod v3.16.3)

Link to this section Summary

Functions

De-register the consumer for a partition. The partition consumer entry is deleted from the ETS table to allow cleanup of purposefully stopped consumers and allow later restart.
De-register the producer for a partition. The partition producer entry is deleted from the ETS table to allow cleanup of purposefully stopped producers and allow later restart.
Get connection to a kafka broker. Return already established connection towards the broker, otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago, {error, LastReason} is returned.
Get consumer of the given topic-parition.
Get broker endpoint and connection config for connecting a group coordinator.
Get the connection to kafka broker which is a leader for given Topic-Partition. Return already established connection towards the leader broker, Otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago, {error, LastReason} is returned.
Get topic metadata, if topic is undefined, will fetch ALL metadata.
Get number of partitions for a given topic.
Get producer of the given topic-partition. The producer is started if auto_start_producers is enabled in client config.
Register self() as a partition consumer. The pid is registered in an ETS table, then the callers may lookup a consumer pid from the table and make subscribe calls to the process directly.
Register self() as a partition producer. The pid is registered in an ETS table, then the callers may lookup a producer pid from the table and make produce requests to the producer process directly.
Dynamically start a topic consumer. Returns ok if the consumer is already started.
Dynamically start a per-topic producer. Return ok if the producer is already started.
Stop all partition consumers of the given topic.
Stop all partition producers of the given topic.

Link to this section Types

-type client() :: brod:client().
-type client_id() :: brod:client_id().
-type config() :: proplists:proplist().
-type conn_state() :: #conn{}.
-type connection() :: kpro:connection().
-type dead_conn() :: {dead_since, erlang:timestamp(), any()}.
-type endpoint() :: brod:endpoint().
-type get_consumer_error() ::
    client_down |
    {client_down, any()} |
    {consumer_down, any()} |
    {consumer_not_found, topic()} |
    {consumer_not_found, topic(), partition()}.
-type get_producer_error() ::
    client_down |
    {client_down, any()} |
    {producer_down, any()} |
    {producer_not_found, topic()} |
    {producer_not_found, topic(), partition()}.
-type get_worker_error() :: get_producer_error() | get_consumer_error().
-type group_id() :: brod:group_id().
-type partition() :: brod:partition().
Link to this type

partition_worker_key/0

View Source
-type partition_worker_key() :: {producer, topic(), partition()} | {consumer, topic(), partition()}.
-type state() :: #state{}.
-type topic() :: brod:topic().

Link to this section Functions

Link to this function

deregister_consumer(Client, Topic, Partition)

View Source
-spec deregister_consumer(client(), topic(), partition()) -> ok.
De-register the consumer for a partition. The partition consumer entry is deleted from the ETS table to allow cleanup of purposefully stopped consumers and allow later restart.
Link to this function

deregister_producer(Client, Topic, Partition)

View Source
-spec deregister_producer(client(), topic(), partition()) -> ok.
De-register the producer for a partition. The partition producer entry is deleted from the ETS table to allow cleanup of purposefully stopped producers and allow later restart.
Link to this function

find_consumer(Client, Topic, Partition)

View Source
-spec find_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, get_consumer_error()}.
Link to this function

find_producer(Client, Topic, Partition)

View Source
-spec find_producer(client(), topic(), partition()) -> {ok, pid()} | {error, get_producer_error()}.
Link to this function

get_connection(Client, Host, Port)

View Source
-spec get_connection(client(), brod:hostname(), brod:portnum()) -> {ok, pid()} | {error, any()}.
Get connection to a kafka broker. Return already established connection towards the broker, otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago, {error, LastReason} is returned.
Link to this function

get_consumer(Client, Topic, Partition)

View Source
-spec get_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, get_consumer_error()}.
Get consumer of the given topic-parition.
Link to this function

get_group_coordinator(Client, GroupId)

View Source
-spec get_group_coordinator(client(), group_id()) ->
                         {ok, {endpoint(), brod:conn_config()}} | {error, any()}.
Get broker endpoint and connection config for connecting a group coordinator.
Link to this function

get_leader_connection(Client, Topic, Partition)

View Source
-spec get_leader_connection(client(), topic(), partition()) -> {ok, pid()} | {error, any()}.
Get the connection to kafka broker which is a leader for given Topic-Partition. Return already established connection towards the leader broker, Otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago, {error, LastReason} is returned.
Link to this function

get_metadata(Client, Topic)

View Source
-spec get_metadata(client(), all | undefined | topic()) -> {ok, kpro:struct()} | {error, any()}.
Get topic metadata, if topic is undefined, will fetch ALL metadata.
Link to this function

get_partitions_count(Client, Topic)

View Source
-spec get_partitions_count(client(), topic()) -> {ok, pos_integer()} | {error, any()}.
Get number of partitions for a given topic.
Link to this function

get_producer(Client, Topic, Partition)

View Source
-spec get_producer(client(), topic(), partition()) -> {ok, pid()} | {error, get_producer_error()}.
Get producer of the given topic-partition. The producer is started if auto_start_producers is enabled in client config.
Link to this function

register_consumer(Client, Topic, Partition)

View Source
-spec register_consumer(client(), topic(), partition()) -> ok.
Register self() as a partition consumer. The pid is registered in an ETS table, then the callers may lookup a consumer pid from the table and make subscribe calls to the process directly.
Link to this function

register_producer(Client, Topic, Partition)

View Source
-spec register_producer(client(), topic(), partition()) -> ok.
Register self() as a partition producer. The pid is registered in an ETS table, then the callers may lookup a producer pid from the table and make produce requests to the producer process directly.
Link to this function

start_consumer(Client, TopicName, ConsumerConfig)

View Source
-spec start_consumer(client(), topic(), brod:consumer_config()) -> ok | {error, any()}.
Dynamically start a topic consumer. Returns ok if the consumer is already started.
Link to this function

start_link(BootstrapEndpoints, ClientId, Config)

View Source
-spec start_link([endpoint()], client_id(), config()) -> {ok, pid()} | {error, any()}.
Link to this function

start_producer(Client, TopicName, ProducerConfig)

View Source
-spec start_producer(client(), topic(), brod:producer_config()) -> ok | {error, any()}.
Dynamically start a per-topic producer. Return ok if the producer is already started.
-spec stop(client()) -> ok.
Link to this function

stop_consumer(Client, TopicName)

View Source
-spec stop_consumer(client(), topic()) -> ok | {error, any()}.
Stop all partition consumers of the given topic.
Link to this function

stop_producer(Client, TopicName)

View Source
-spec stop_producer(client(), topic()) -> ok | {error, any()}.
Stop all partition producers of the given topic.