Module brod_client

Behaviours: gen_server.

Data Types

client()

client() = brod:client()

client_id()

client_id() = brod:client_id()

config()

config() = proplists:proplist()

endpoint()

endpoint() = brod:endpoint()

get_consumer_error()

get_consumer_error() = client_down | {consumer_down, noproc} | {consumer_not_found, topic()} | {consumer_not_found, topic(), partition()}

get_producer_error()

get_producer_error() = client_down | {producer_down, noproc} | {producer_not_found, topic()} | {producer_not_found, topic(), partition()}

group_id()

group_id() = brod:group_id()

partition()

partition() = brod:partition()

topic()

topic() = brod:topic()

Function Index

deregister_consumer/3De-register the consumer for a partition.
deregister_producer/3De-register the producer for a partition.
find_consumer/3
find_producer/3
get_connection/3Get connection to a kafka broker.
get_consumer/3Get consumer of the given topic-parition.
get_group_coordinator/2Get broker endpoint and connection config for connecting a group coordinator.
get_leader_connection/3Get the connection to kafka broker which is a leader for given Topic-Partition.
get_metadata/2Get topic metadata, if topic is undefined, will fetch ALL metadata.
get_partitions_count/2Get number of partitions for a given topic.
get_producer/3Get producer of the given topic-partition.
register_consumer/3Register self() as a partition consumer.
register_producer/3Register self() as a partition producer.
start_consumer/3Dynamically start a topic consumer.
start_link/3
start_producer/3Dynamically start a per-topic producer.
stop/1
stop_consumer/2Stop all partition consumers of the given topic.
stop_producer/2Stop all partition producers of the given topic.

Function Details

deregister_consumer/3

deregister_consumer(Client::client(), Topic::topic(), Partition::partition()) -> ok

De-register the consumer for a partition. The partition consumer entry is deleted from the ETS table to allow cleanup of purposefully stopped consumers and allow later restart.

deregister_producer/3

deregister_producer(Client::client(), Topic::topic(), Partition::partition()) -> ok

De-register the producer for a partition. The partition producer entry is deleted from the ETS table to allow cleanup of purposefully stopped producers and allow later restart.

find_consumer/3

find_consumer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, get_consumer_error()}

find_producer/3

find_producer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, get_producer_error()}

get_connection/3

get_connection(Client::client(), Host::brod:hostname(), Port::brod:portnum()) -> {ok, pid()} | {error, any()}

Get connection to a kafka broker. Return already established connection towards the broker, otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago, {error, LastReason} is returned.

get_consumer/3

get_consumer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, get_consumer_error()}

Get consumer of the given topic-parition.

get_group_coordinator/2

get_group_coordinator(Client::client(), GroupId::group_id()) -> {ok, {endpoint(), brod:conn_config()}} | {error, any()}

Get broker endpoint and connection config for connecting a group coordinator.

get_leader_connection/3

get_leader_connection(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, any()}

Get the connection to kafka broker which is a leader for given Topic-Partition. Return already established connection towards the leader broker, Otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago, {error, LastReason} is returned.

get_metadata/2

get_metadata(Client::client(), Topic::all | undefined | topic()) -> {ok, kpro:struct()} | {error, any()}

Get topic metadata, if topic is undefined, will fetch ALL metadata.

get_partitions_count/2

get_partitions_count(Client::client(), Topic::topic()) -> {ok, pos_integer()} | {error, any()}

Get number of partitions for a given topic.

get_producer/3

get_producer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, get_producer_error()}

Get producer of the given topic-partition. The producer is started if auto_start_producers is enabled in client config.

register_consumer/3

register_consumer(Client::client(), Topic::topic(), Partition::partition()) -> ok

Register self() as a partition consumer. The pid is registered in an ETS table, then the callers may lookup a consumer pid from the table ane make subscribe calls to the process directly.

register_producer/3

register_producer(Client::client(), Topic::topic(), Partition::partition()) -> ok

Register self() as a partition producer. The pid is registered in an ETS table, then the callers may lookup a producer pid from the table and make produce requests to the producer process directly.

start_consumer/3

start_consumer(Client::client(), TopicName::topic(), ConsumerConfig::brod:consumer_config()) -> ok | {error, any()}

Dynamically start a topic consumer. Returns ok if the consumer is already started.

start_link/3

start_link(BootstrapEndpoints::[endpoint()], ClientId::client_id(), Config::config()) -> {ok, pid()} | {error, any()}

start_producer/3

start_producer(Client::client(), TopicName::topic(), ProducerConfig::brod:producer_config()) -> ok | {error, any()}

Dynamically start a per-topic producer. Return ok if the producer is already started.

stop/1

stop(Client::client()) -> ok

stop_consumer/2

stop_consumer(Client::client(), TopicName::topic()) -> ok | {error, any()}

Stop all partition consumers of the given topic.

stop_producer/2

stop_producer(Client::client(), TopicName::topic()) -> ok | {error, any()}

Stop all partition producers of the given topic.


Generated by EDoc