Behaviours: application.
batch_input() = [msg_input()]
bootstrap() = [endpoint()] | {[endpoint()], client_config()}
call_ref() = #brod_call_ref{caller = undefined | pid(), callee = undefined | pid(), ref = undefined | reference()}
cg() = #brod_cg{id = brod:group_id(), protocol_type = brod:cg_protocol_type()}
cg_protocol_type() = binary()
client() = client_id() | pid()
client_config() = brod_client:config()
client_id() = atom()
compression() = no_compression | gzip | snappy
conn_config() = [{atom(), term()}] | kpro:conn_config()
connection() = kpro:connection()
consumer_config() = brod_consumer:config()
consumer_option() = begin_offset | min_bytes | max_bytes | max_wait_time | sleep_timeout | prefetch_count | prefetch_bytes | offset_reset_policy | size_stat_window
consumer_options() = [{consumer_option(), integer()}]
endpoint() = {hostname(), portnum()}
error_code() = kpro:error_code()
fetch_opts() = kpro:fetch_opts()
fold_acc() = term()
fold_fun(Acc) = fun((message(), Acc) -> {ok, Acc} | {error, any()})
fold
always returns when reaches the high watermark offset. fold
also returns when any of the limits is hit.
fold_limits() = #{message_count => pos_integer(), reach_offset => offset()}
fold_result() = {fold_acc(), OffsetToContinue::offset(), fold_stop_reason()}
fold_stop_reason() = reached_end_of_partition | reached_message_count_limit | reached_target_offset | {error, any()}
OffsetToContinue: begin offset for the next fold call
group_config() = proplists:proplist()
group_generation_id() = non_neg_integer()
group_id() = kpro:group_id()
group_member() = {group_member_id(), #kafka_group_member_metadata{version = non_neg_integer(), topics = [brod:topic()], user_data = binary()}}
group_member_id() = binary()
hostname() = kpro:hostname()
key() = undefined | binary()
message() = kpro:message()
message_set() = #kafka_message_set{topic = brod:topic(), partition = brod:partition(), high_wm_offset = integer(), messages = [brod:message()] | kpro:incomplete_batch()}
msg_input() = kpro:msg_input()
msg_ts() = kpro:msg_ts()
offset() = kpro:offset()
offset_time() = integer() | earliest | latest
partition() = kpro:partition()
partition_assignment() = {topic(), [partition()]}
partition_fun() = fun((topic(), pos_integer(), key(), value()) -> {ok, partition()})
partitioner() = partition_fun() | random | hash
portnum() = pos_integer()
produce_ack_cb() = fun((partition(), offset()) -> term())
produce_reply() = #brod_produce_reply{call_ref = brod:call_ref(), base_offset = undefined | brod:offset(), result = brod:produce_result()}
produce_result() = brod_produce_req_buffered | brod_produce_req_acked
producer_config() = brod_producer:config()
received_assignments() = [#brod_received_assignment{topic = brod:topic(), partition = brod:partition(), begin_offset = undefined | brod:offset()}]
topic() = kpro:topic()
topic_config() = kpro:struct()
topic_partition() = {topic(), partition()}
value() = undefined | iodata() | {msg_ts(), binary()} | [{key(), value()}] | [{msg_ts(), key(), value()}] | kpro:msg_input() | kpro:batch_input()
connect_group_coordinator/3 | Connect to consumer group coordinator broker. |
connect_leader/4 | Connect partition leader. |
consume_ack/2 | |
consume_ack/4 | |
create_topics/3 | Equivalent to create_topics(Hosts, TopicsConfigs, RequestConfigs, []). |
create_topics/4 | Create topic(s) in kafka
Return the message body of create_topics , response. |
delete_topics/3 | Equivalent to delete_topics(Hosts, Topics, Timeout, []). |
delete_topics/4 | Delete topic(s) from kafka
Return the message body of delete_topics , response. |
describe_groups/3 | Describe consumer groups. |
fetch/4 | Fetch a single message set from the given topic-partition. |
fetch/5 | Fetch a single message set from the given topic-partition. |
fetch/7 | (Deprecated.) |
fetch/8 | (Deprecated.) |
fetch_committed_offsets/2 | Same as fetch_committed_offsets/3 ,
but works with a started brod_client |
fetch_committed_offsets/3 | Fetch committed offsets for ALL topics in the given consumer group. |
fold/8 | Fold through messages in a partition. |
get_consumer/3 | |
get_metadata/1 | Fetch broker metadata
Return the message body of metadata response. |
get_metadata/2 | Fetch broker/topic metadata
Return the message body of metadata response. |
get_metadata/3 | Fetch broker/topic metadata
Return the message body of metadata response. |
get_partitions_count/2 | Get number of partitions for a given topic. |
get_producer/3 | Equivalent to brod_client:get_producer / 3. |
list_all_groups/2 | List ALL consumer groups in the given kafka cluster. |
list_groups/2 | List consumer groups in the given group coordinator broker. |
main/1 | |
produce/2 | Equivalent to produce(Pid, 0, <<>>, Value). |
produce/3 | Produce one message if Value is a binary or an
iolist. |
produce/5 | Produce one message if Value is a binary or an iolist. |
produce_cb/4 | Same as produce/3 , only the ack is not delivered as a message,
instead, the callback is evaluated by producer worker when ack is received
from kafka. |
produce_cb/6 | Same as produce/5 only the ack is not delivered as a message,
instead, the callback is evaluated by producer worker when ack is received
from kafka. |
produce_no_ack/5 | Find the partition worker and send message without any ack. |
produce_sync/2 | Same as produce/5 only the ack is not d. |
produce_sync/3 | Sync version of produce/3. |
produce_sync/5 | Sync version of produce/5 This function will not return until a response is received from kafka, however if producer is started with required_acks set to 0, this function will return once the messages are buffered in the producer process. |
produce_sync_offset/5 | Version of produce_sync/5 that returns the offset assigned by Kafka
If producer is started with required_acks set to 0, the offset will be
?BROD_PRODUCE_UNKNOWN_OFFSET . |
resolve_offset/3 | Equivalent to resolve_offset(Hosts, Topic, Partition, latest, 1). |
resolve_offset/4 | Resolve semantic offset or timestamp to real offset. |
resolve_offset/5 | Resolve semantic offset or timestamp to real offset. |
resolve_offset/6 | Resolve semantic offset or timestamp to real offset. |
start/0 | Start brod application. |
start/2 | Application behaviour callback. |
start_client/1 | Equivalent to start_client(BootstrapEndpoints, brod_default_client). |
start_client/2 | Equivalent to start_client(BootstrapEndpoints, ClientId, []). |
start_client/3 | Start a client. |
start_consumer/3 | Dynamically start a topic consumer. |
start_link_client/1 | Equivalent to start_link_client(BootstrapEndpoints, brod_default_client). |
start_link_client/2 | Equivalent to start_link_client(BootstrapEndpoints, ClientId, []). |
start_link_client/3 | |
start_link_group_subscriber/7 | |
start_link_group_subscriber/8 | |
start_link_group_subscriber_v2/1 | Start group_subscriber_v2. |
start_link_topic_subscriber/1 | |
start_link_topic_subscriber/5 | (Deprecated.) Equivalent to start_link_topic_subscriber(Client, Topic, all, ConsumerConfig, CbModule, CbInitArg). |
start_link_topic_subscriber/6 | (Deprecated.) Equivalent to start_link_topic_subscriber(Client, Topic, Partitions, ConsumerConfig, message, CbModule, CbInitArg). |
start_link_topic_subscriber/7 | (Deprecated.) |
start_producer/3 | Dynamically start a per-topic producer. |
stop/0 | Stop brod application. |
stop/1 | Application behaviour callback. |
stop_client/1 | Stop a client. |
subscribe/3 | |
subscribe/5 | Subscribe to a data stream from the given topic-partition. |
sync_produce_request/1 | Block wait for sent produced request to be acked by kafka. |
sync_produce_request/2 | |
sync_produce_request_offset/1 | As sync_produce_request_offset/1, but also returning assigned offset See produce_sync_offset/5. |
sync_produce_request_offset/2 | |
unsubscribe/1 | Unsubscribe the current subscriber. |
unsubscribe/2 | Unsubscribe the current subscriber. |
unsubscribe/3 | Unsubscribe the current subscriber. |
unsubscribe/4 | Unsubscribe the current subscriber. |
connect_group_coordinator(BootstrapEndpoints::[endpoint()], ConnCfg::conn_config(), GroupId::group_id()) -> {ok, pid()} | {error, any()}
Connect to consumer group coordinator broker.
Done in steps:connect_leader(Hosts::[endpoint()], Topic::topic(), Partition::partition(), ConnConfig::conn_config()) -> {ok, pid()}
Connect partition leader.
consume_ack(ConsumerPid::pid(), Offset::offset()) -> ok | {error, any()}
consume_ack(Client::client(), Topic::topic(), Partition::partition(), Offset::offset()) -> ok | {error, any()}
create_topics(Hosts::[endpoint()], TopicConfigs::[topic_config()], RequestConfigs::#{timeout => kpro:int32(), validate_only => boolean()}) -> ok | {ok, kpro:struct()} | {error, any()}
Equivalent to create_topics(Hosts, TopicsConfigs, RequestConfigs, []).
create_topics(Hosts::[endpoint()], TopicConfigs::[topic_config()], RequestConfigs::#{timeout => kpro:int32(), validate_only => boolean()}, Options::conn_config()) -> ok | {ok, kpro:struct()} | {error, any()}
Create topic(s) in kafka
Return the message body of create_topics
, response.
See kpro_schema.erl
for struct details
delete_topics(Hosts::[endpoint()], Topics::[topic()], Timeout::pos_integer()) -> ok | {ok, kpro:struct()} | {error, any()}
Equivalent to delete_topics(Hosts, Topics, Timeout, []).
delete_topics(Hosts::[endpoint()], Topics::[topic()], Timeout::pos_integer(), Options::conn_config()) -> ok | {ok, kpro:struct()} | {error, any()}
Delete topic(s) from kafka
Return the message body of delete_topics
, response.
See kpro_schema.erl
for struct details
describe_groups(CoordinatorEndpoint::endpoint(), ConnCfg::conn_config(), IDs::[group_id()]) -> {ok, [kpro:struct()]} | {error, any()}
Describe consumer groups. The given consumer group IDs should be all
managed by the coordinator-broker running at the given endpoint.
Otherwise error codes will be returned in the result structs.
Return describe_groups
response body field named groups
.
See kpro_schema.erl
for struct details
fetch(ConnOrBootstrap::connection() | client_id() | bootstrap(), Topic::topic(), Partition::partition(), Offset::integer()) -> {ok, {HwOffset::offset(), [message()]}} | {error, any()}
Fetch a single message set from the given topic-partition.
The first arg can either be an already established connection to leader,
or {Endpoints, ConnConfig}
so to establish a new connection before fetch.
fetch(ConnOrBootstrap::connection() | client_id() | bootstrap(), Topic::topic(), Partition::partition(), Offset::offset(), Opts::fetch_opts()) -> {ok, {HwOffset::offset(), [message()]}} | {error, any()}
Fetch a single message set from the given topic-partition.
The first arg can either be an already established connection to leader,
or {Endpoints, ConnConfig}
so to establish a new connection before fetch.
fetch(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Offset::offset(), MaxWaitTime::non_neg_integer(), MinBytes::non_neg_integer(), MaxBytes::pos_integer()) -> {ok, [message()]} | {error, any()}
This function is deprecated: fetch(Hosts, Topic, Partition, Offset, Wait, MinBytes, MaxBytes, [])
fetch(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Offset::offset(), MaxWaitTime::non_neg_integer(), MinBytes::non_neg_integer(), MaxBytes::pos_integer(), ConnConfig::conn_config()) -> {ok, [message()]} | {error, any()}
This function is deprecated: Fetch a single message set from the given topic-partition.
fetch_committed_offsets(Client::client(), GroupId::group_id()) -> {ok, [kpro:struct()]} | {error, any()}
Same as fetch_committed_offsets/3
,
but works with a started brod_client
fetch_committed_offsets(BootstrapEndpoints::[endpoint()], ConnCfg::conn_config(), GroupId::group_id()) -> {ok, [kpro:struct()]} | {error, any()}
Fetch committed offsets for ALL topics in the given consumer group.
Return the responses
field of the offset_fetch
response.
See kpro_schema.erl
for struct details.
fold(Bootstrap::connection() | client_id() | bootstrap(), Topic::topic(), Partition::partition(), Offset::offset(), Opts::fetch_opts(), Acc, Fun::fold_fun(Acc), Limits::fold_limits()) -> fold_result()
Fold through messages in a partition.
Works like lists:foldl/2
but with below stop conditions:
* Always return after reach high watermark offset
* Return after the given message count limit is reached
* Return after the given kafka offset is reached.
* Return if the FoldFun
returns an {error, Reason}
tuple.
NOTE: Exceptions from evaluating FoldFun
are not caught.
get_consumer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, Reason}
get_metadata(Hosts::[endpoint()]) -> {ok, kpro:struct()} | {error, any()}
Fetch broker metadata
Return the message body of metadata
response.
See kpro_schema.erl
for details
get_metadata(Hosts::[endpoint()], Topics::all | [topic()]) -> {ok, kpro:struct()} | {error, any()}
Fetch broker/topic metadata
Return the message body of metadata
response.
See kpro_schema.erl
for struct details
get_metadata(Hosts::[endpoint()], Topics::all | [topic()], Options::conn_config()) -> {ok, kpro:struct()} | {error, any()}
Fetch broker/topic metadata
Return the message body of metadata
response.
See kpro_schema.erl
for struct details
Get number of partitions for a given topic. The higher level producers may need the partition numbers to find the partition producer pid --- if the number of partitions is not statically configured for them. It is up to the callers how they want to distribute their data (e.g. random, roundrobin or consistent-hashing) to the partitions.
get_producer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, Reason}
Equivalent to brod_client:get_producer / 3.
list_all_groups(Endpoints::[endpoint()], ConnCfg::conn_config()) -> [{endpoint(), [cg()] | {error, any()}}]
List ALL consumer groups in the given kafka cluster. NOTE: Exception if failed to connect any of the coordinator brokers.
list_groups(CoordinatorEndpoint::endpoint(), ConnCfg::conn_config()) -> {ok, [cg()]} | {error, any()}
List consumer groups in the given group coordinator broker.
main(X) -> any()
produce(Pid::pid(), Value::value()) -> {ok, call_ref()} | {error, any()}
Equivalent to produce(Pid, 0, <<>>, Value).
produce(ProducerPid::pid(), Key::key(), Value::value()) -> {ok, call_ref()} | {error, any()}
Produce one message if Value
is a binary or an
iolist. Otherwise send a batch, if Value
is a (nested) key-value
list, or a list of maps. In this case Key
is discarded (only the
keys in the key-value list are sent to Kafka). The pid should be a
partition producer pid, NOT client pid. The return value is a call
reference of type call_ref()
, so the caller can use it to expect
(match) a #brod_produce_reply{result = brod_produce_req_acked}
message after the produce request has been acked by Kafka.
produce(Client::client(), Topic::topic(), Partition::partition() | partitioner(), Key::key(), Value::value()) -> {ok, call_ref()} | {error, any()}
Produce one message if Value
is a binary or an iolist.
Otherwise send a batch if Value
is a (nested) key-value list, or
a list of maps. In this case Key
is used only for partitioning,
or discarded if the 3rd argument is a partition number instead of a
partitioner callback. This function first looks up the producer
pid, then calls produce/3
to do the real work. The return value
is a call reference of type call_ref()
, so the caller can used it
to expect (match) a #brod_produce_reply{result =
brod_produce_req_acked}
message after the produce request has been
acked by Kafka.
produce_cb(ProducerPid::pid(), Key::key(), Value::value(), AckCb::produce_ack_cb()) -> ok | {error, any()}
Same as produce/3
, only the ack is not delivered as a message,
instead, the callback is evaluated by producer worker when ack is received
from kafka.
produce_cb(Client::client(), Topic::topic(), Part::partition() | partitioner(), Key::key(), Value::value(), AckCb::produce_ack_cb()) -> ok | {ok, partition()} | {error, any()}
Same as produce/5
only the ack is not delivered as a message,
instead, the callback is evaluated by producer worker when ack is received
from kafka. Return the partition to caller as {ok, Partition}
for caller
to correlate the callback when the 3rd arg is not a partition number.
produce_no_ack(Client::client(), Topic::topic(), Part::partition() | partitioner(), Key::key(), Value::value()) -> ok | {error, any()}
Find the partition worker and send message without any ack. NOTE: This call has no back-pressure to the caller, excessive usage may cause BEAM to run out of memory.
produce_sync(Pid::pid(), Value::value()) -> ok
Equivalent to produce_sync(Pid, 0, <<>>, Value).
Same as produce/5
only the ack is not d
Sync version of produce/3
This function will not return until the response is received from Kafka. But when producer is started withrequired_acks
set to 0,
this function will return once the messages are buffered in the
producer process.
produce_sync(Client::client(), Topic::topic(), Partition::partition() | partitioner(), Key::key(), Value::value()) -> ok | {error, any()}
Sync version of produce/5 This function will not return until a response is received from kafka, however if producer is started with required_acks set to 0, this function will return once the messages are buffered in the producer process.
produce_sync_offset(Client::client(), Topic::topic(), Partition::partition() | partitioner(), Key::key(), Value::value()) -> {ok, offset()} | {error, any()}
Version of produce_sync/5 that returns the offset assigned by Kafka
If producer is started with required_acks set to 0, the offset will be
?BROD_PRODUCE_UNKNOWN_OFFSET
.
resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition()) -> {ok, offset()} | {error, any()}
Equivalent to resolve_offset(Hosts, Topic, Partition, latest, 1).
resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Time::offset_time()) -> {ok, offset()} | {error, any()}
Resolve semantic offset or timestamp to real offset.
resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Time::offset_time(), ConnCfg::conn_config()) -> {ok, offset()} | {error, any()}
Resolve semantic offset or timestamp to real offset.
resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Time::offset_time(), ConnCfg::conn_config(), Opts::#{timeout => kpro:int32()}) -> {ok, offset()} | {error, any()}
Resolve semantic offset or timestamp to real offset.
start() -> ok | no_return()
Start brod application.
start(StartType, StartArgs) -> any()
Application behaviour callback
start_client(BootstrapEndpoints::[endpoint()]) -> ok | {error, any()}
Equivalent to start_client(BootstrapEndpoints, brod_default_client).
start_client(BootstrapEndpoints::[endpoint()], ClientId::client_id()) -> ok | {error, any()}
Equivalent to start_client(BootstrapEndpoints, ClientId, []).
start_client(BootstrapEndpoints::[endpoint()], ClientId::client_id(), Config::client_config()) -> ok | {error, any()}
Start a client.
BootstrapEndpoints
:
Kafka cluster endpoints, can be any of the brokers in the cluster,
which does not necessarily have to be the leader of any partition,
e.g. a load-balanced entrypoint to the remote Kafka cluster.
ClientId
: Atom to identify the client process.
Config
is a proplist, possible values:
restart_delay_seconds
(optional, default=10)
get_metadata_timeout_seconds
(optional, default=5)
{error, timeout}
from brod_client:get_xxx
calls if
responses for APIs such as metadata
, find_coordinator
are not received in time.reconnect_cool_down_seconds
(optional, default=1)
allow_topic_auto_creation
(optional, default=true)
auto.create.topics.enable
is set in the broker configuration.
However if allow_topic_auto_creation
is set to false
in
client config, brod will avoid sending metadata requests that
may cause an auto-creation of the topic regardless of what
broker config is.auto_start_producers
(optional, default=false)
produce
but did not call brod:start_producer
explicitly. Can be useful for applications which don't know beforehand
which topics they will be working with.default_producer_config
(optional, default=[])
brod_producer:start_link/4
for details about producer configConnection options can be added to the same proplist. See
kpro_connection.erl
in kafka_protocol
for the details:
ssl
(optional, default=false)
true | false | ssl:ssl_option()
true
is translated to []
as ssl:ssl_option()
i.e. all default.
sasl
(optional, default=undefined)
{mechanism(), Filename}
or {mechanism(), UserName, Password}
where mechanism can be atoms: plain
(for "PLAIN"), scram_sha_256
(for "SCRAM-SHA-256") or scram_sha_512
(for SCRAM-SHA-512).
Filename
should be a file consisting two lines, first line
is the username and the second line is the password.
Both Username
and Password
should be string() | binary()
connect_timeout
(optional, default=5000)
request_timeout
(optional, default=240000, constraint: >= 1000)
query_api_versions
(optional, default=true)
true
, at connection start, brod will send a query request
to get the broker supported API version ranges.
When set to 'false', brod will alway use the lowest supported API version
when sending requests to kafka.
Supported API version ranges can be found in:
brod_kafka_apis:supported_versions/1
extra_sock_opts
(optional, default=[])
[{sndbuf, 1 bsl 20}]
.
More info
start_consumer(Client::client(), TopicName::topic(), ConsumerConfig::consumer_config()) -> ok | {error, any()}
Dynamically start a topic consumer.
See also: for details about consumer config..
start_link_client(BootstrapEndpoints::[endpoint()]) -> {ok, pid()} | {error, any()}
Equivalent to start_link_client(BootstrapEndpoints, brod_default_client).
start_link_client(BootstrapEndpoints::[endpoint()], ClientId::client_id()) -> {ok, pid()} | {error, any()}
Equivalent to start_link_client(BootstrapEndpoints, ClientId, []).
start_link_client(BootstrapEndpoints::[endpoint()], ClientId::client_id(), Config::client_config()) -> {ok, pid()} | {error, any()}
start_link_group_subscriber(Client::client(), GroupId::group_id(), Topics::[topic()], GroupConfig::group_config(), ConsumerConfig::consumer_config(), CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}
See also: brod_group_subscriber:start_link/7.
start_link_group_subscriber(Client::client(), GroupId::group_id(), Topics::[topic()], GroupConfig::group_config(), ConsumerConfig::consumer_config(), MessageType::message | message_set, CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}
See also: brod_group_subscriber:start_link/8.
start_link_group_subscriber_v2(Config::brod_group_subscriber_v2:subscriber_config()) -> {ok, pid()} | {error, any()}
Start group_subscriber_v2
start_link_topic_subscriber(Config::brod_topic_subscriber:topic_subscriber_config()) -> {ok, pid()} | {error, any()}
See also: brod_topic_subscriber:start_link/1.
start_link_topic_subscriber(Client::client(), Topic::topic(), ConsumerConfig::consumer_config(), CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}
Equivalent to start_link_topic_subscriber(Client, Topic, all, ConsumerConfig, CbModule, CbInitArg).
This function is deprecated: Please use start_link_topic_subscriber/1
instead
start_link_topic_subscriber(Client::client(), Topic::topic(), Partitions::all | [partition()], ConsumerConfig::consumer_config(), CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}
Equivalent to start_link_topic_subscriber(Client, Topic, Partitions, ConsumerConfig, message, CbModule, CbInitArg).
This function is deprecated: Please use start_link_topic_subscriber/1
instead
start_link_topic_subscriber(Client::client(), Topic::topic(), Partitions::all | [partition()], ConsumerConfig::consumer_config(), MessageType::message | message_set, CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}
This function is deprecated: Please use start_link_topic_subscriber/1
instead
See also: brod_topic_subscriber:start_link/7.
start_producer(Client::client(), TopicName::topic(), ProducerConfig::producer_config()) -> ok | {error, any()}
Dynamically start a per-topic producer.
See also: brod_producer:start_link/4.
stop() -> ok
Stop brod application.
stop(State) -> any()
Application behaviour callback
stop_client(Client::client()) -> ok
Stop a client.
subscribe(ConsumerPid::pid(), SubscriberPid::pid(), Options::consumer_options()) -> ok | {error, any()}
subscribe(Client::client(), SubscriberPid::pid(), Topic::topic(), Partition::partition(), Options::consumer_options()) -> {ok, pid()} | {error, any()}
Subscribe to a data stream from the given topic-partition.
If {error, Reason}
is returned, the caller should perhaps retry later.
{ok, ConsumerPid}
is returned on success. The caller may want to
monitor the consumer pid and re-subscribe should the ConsumerPid
crash.
Upon successful subscription the subscriber process should expect messages
of pattern:
{ConsumerPid, #kafka_message_set{}}
and
{ConsumerPid, #kafka_fetch_error{}}
.
-include_lib("brod/include/brod.hrl")
to access the records.
#kafka_fetch_error{}
is received the subscriber should
re-subscribe itself to resume the data stream.
sync_produce_request(CallRef::call_ref()) -> ok | {error, Reason::any()}
Block wait for sent produced request to be acked by kafka.
sync_produce_request(CallRef::call_ref(), Timeout::timeout()) -> ok | {error, Reason::any()}
sync_produce_request_offset(CallRef::call_ref()) -> {ok, offset()} | {error, Reason::any()}
As sync_produce_request_offset/1, but also returning assigned offset See produce_sync_offset/5.
sync_produce_request_offset(CallRef::call_ref(), Timeout::timeout()) -> {ok, offset()} | {error, Reason::any()}
unsubscribe(ConsumerPid::pid()) -> ok | {error, any()}
Unsubscribe the current subscriber. Assuming the subscriber is
self()
.
unsubscribe(ConsumerPid::pid(), SubscriberPid::pid()) -> ok | {error, any()}
Unsubscribe the current subscriber.
unsubscribe(Client::client(), Topic::topic(), Partition::partition()) -> ok | {error, any()}
Unsubscribe the current subscriber. Assuming the subscriber is
self()
.
unsubscribe(Client::client(), Topic::topic(), Partition::partition(), SubscriberPid::pid()) -> ok | {error, any()}
Unsubscribe the current subscriber.
Generated by EDoc