Module brod

Behaviours: application.

Data Types

batch_input()

batch_input() = [msg_input()]

bootstrap()

bootstrap() = [endpoint()] | {[endpoint()], client_config()}

call_ref()

call_ref() = #brod_call_ref{caller = undefined | pid(), callee = undefined | pid(), ref = undefined | reference()}

cg()

cg() = #brod_cg{id = brod:group_id(), protocol_type = brod:cg_protocol_type()}

cg_protocol_type()

cg_protocol_type() = binary()

client()

client() = client_id() | pid()

client_config()

client_config() = brod_client:config()

client_id()

client_id() = atom()

compression()

compression() = no_compression | gzip | snappy

conn_config()

conn_config() = [{atom(), term()}] | kpro:conn_config()

connection()

connection() = kpro:connection()

consumer_config()

consumer_config() = brod_consumer:config()

consumer_option()

consumer_option() = begin_offset | min_bytes | max_bytes | max_wait_time | sleep_timeout | prefetch_count | prefetch_bytes | offset_reset_policy | size_stat_window

consumer_options()

consumer_options() = [{consumer_option(), integer()}]

endpoint()

endpoint() = {hostname(), portnum()}

error_code()

error_code() = kpro:error_code()

fetch_opts()

fetch_opts() = kpro:fetch_opts()

fold_acc()

fold_acc() = term()

fold_fun()

fold_fun(Acc) = fun((message(), Acc) -> {ok, Acc} | {error, any()})

fold always returns when reaches the high watermark offset. fold also returns when any of the limits is hit.

fold_limits()

fold_limits() = #{message_count => pos_integer(), reach_offset => offset()}

fold_result()

fold_result() = {fold_acc(), OffsetToContinue::offset(), fold_stop_reason()}

fold_stop_reason()

fold_stop_reason() = reached_end_of_partition | reached_message_count_limit | reached_target_offset | {error, any()}

OffsetToContinue: begin offset for the next fold call

group_config()

group_config() = proplists:proplist()

group_generation_id()

group_generation_id() = non_neg_integer()

group_id()

group_id() = kpro:group_id()

group_member()

group_member() = {group_member_id(), #kafka_group_member_metadata{version = non_neg_integer(), topics = [brod:topic()], user_data = binary()}}

group_member_id()

group_member_id() = binary()

hostname()

hostname() = kpro:hostname()

key()

key() = undefined | binary()

message()

message() = kpro:message()

message_set()

message_set() = #kafka_message_set{topic = brod:topic(), partition = brod:partition(), high_wm_offset = integer(), messages = [brod:message()] | kpro:incomplete_batch()}

msg_input()

msg_input() = kpro:msg_input()

msg_ts()

msg_ts() = kpro:msg_ts()

offset()

offset() = kpro:offset()

offset_time()

offset_time() = integer() | earliest | latest

partition()

partition() = kpro:partition()

partition_assignment()

partition_assignment() = {topic(), [partition()]}

partition_fun()

partition_fun() = fun((topic(), pos_integer(), key(), value()) -> {ok, partition()})

partitioner()

partitioner() = partition_fun() | random | hash

portnum()

portnum() = pos_integer()

produce_ack_cb()

produce_ack_cb() = fun((partition(), offset()) -> term())

produce_reply()

produce_reply() = #brod_produce_reply{call_ref = brod:call_ref(), base_offset = undefined | brod:offset(), result = brod:produce_result()}

produce_result()

produce_result() = brod_produce_req_buffered | brod_produce_req_acked

producer_config()

producer_config() = brod_producer:config()

received_assignments()

received_assignments() = [#brod_received_assignment{topic = brod:topic(), partition = brod:partition(), begin_offset = undefined | brod:offset()}]

topic()

topic() = kpro:topic()

topic_config()

topic_config() = kpro:struct()

topic_partition()

topic_partition() = {topic(), partition()}

value()

value() = undefined | iodata() | {msg_ts(), binary()} | [{key(), value()}] | [{msg_ts(), key(), value()}] | kpro:msg_input() | kpro:batch_input()

Function Index

connect_group_coordinator/3Connect to consumer group coordinator broker.
connect_leader/4Connect partition leader.
consume_ack/2
consume_ack/4
create_topics/3Equivalent to create_topics(Hosts, TopicsConfigs, RequestConfigs, []).
create_topics/4Create topic(s) in kafka Return the message body of create_topics, response.
delete_topics/3Equivalent to delete_topics(Hosts, Topics, Timeout, []).
delete_topics/4Delete topic(s) from kafka Return the message body of delete_topics, response.
describe_groups/3Describe consumer groups.
fetch/4Fetch a single message set from the given topic-partition.
fetch/5Fetch a single message set from the given topic-partition.
fetch/7(Deprecated.)
fetch/8(Deprecated.)
fetch_committed_offsets/2Same as fetch_committed_offsets/3, but works with a started brod_client
fetch_committed_offsets/3Fetch committed offsets for ALL topics in the given consumer group.
fold/8Fold through messages in a partition.
get_consumer/3
get_metadata/1Fetch broker metadata Return the message body of metadata response.
get_metadata/2Fetch broker/topic metadata Return the message body of metadata response.
get_metadata/3Fetch broker/topic metadata Return the message body of metadata response.
get_partitions_count/2Get number of partitions for a given topic.
get_producer/3Equivalent to brod_client:get_producer / 3.
list_all_groups/2List ALL consumer groups in the given kafka cluster.
list_groups/2List consumer groups in the given group coordinator broker.
main/1
produce/2Equivalent to produce(Pid, 0, <<>>, Value).
produce/3Produce one message if Value is a binary or an iolist.
produce/5Produce one message if Value is a binary or an iolist.
produce_cb/4Same as produce/3, only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka.
produce_cb/6Same as produce/5 only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka.
produce_no_ack/5Find the partition worker and send message without any ack.
produce_sync/2Same as produce/5 only the ack is not d.
produce_sync/3Sync version of produce/3.
produce_sync/5Sync version of produce/5 This function will not return until a response is received from kafka, however if producer is started with required_acks set to 0, this function will return once the messages are buffered in the producer process.
produce_sync_offset/5Version of produce_sync/5 that returns the offset assigned by Kafka If producer is started with required_acks set to 0, the offset will be ?BROD_PRODUCE_UNKNOWN_OFFSET.
resolve_offset/3Equivalent to resolve_offset(Hosts, Topic, Partition, latest, 1).
resolve_offset/4Resolve semantic offset or timestamp to real offset.
resolve_offset/5Resolve semantic offset or timestamp to real offset.
resolve_offset/6Resolve semantic offset or timestamp to real offset.
start/0Start brod application.
start/2Application behaviour callback.
start_client/1Equivalent to start_client(BootstrapEndpoints, brod_default_client).
start_client/2Equivalent to start_client(BootstrapEndpoints, ClientId, []).
start_client/3Start a client.
start_consumer/3Dynamically start a topic consumer.
start_link_client/1Equivalent to start_link_client(BootstrapEndpoints, brod_default_client).
start_link_client/2Equivalent to start_link_client(BootstrapEndpoints, ClientId, []).
start_link_client/3
start_link_group_subscriber/7
start_link_group_subscriber/8
start_link_group_subscriber_v2/1Start group_subscriber_v2.
start_link_topic_subscriber/1
start_link_topic_subscriber/5(Deprecated.) Equivalent to start_link_topic_subscriber(Client, Topic, all, ConsumerConfig, CbModule, CbInitArg).
start_link_topic_subscriber/6(Deprecated.) Equivalent to start_link_topic_subscriber(Client, Topic, Partitions, ConsumerConfig, message, CbModule, CbInitArg).
start_link_topic_subscriber/7(Deprecated.)
start_producer/3Dynamically start a per-topic producer.
stop/0Stop brod application.
stop/1Application behaviour callback.
stop_client/1Stop a client.
subscribe/3
subscribe/5Subscribe to a data stream from the given topic-partition.
sync_produce_request/1Block wait for sent produced request to be acked by kafka.
sync_produce_request/2
sync_produce_request_offset/1As sync_produce_request_offset/1, but also returning assigned offset See produce_sync_offset/5.
sync_produce_request_offset/2
unsubscribe/1Unsubscribe the current subscriber.
unsubscribe/2Unsubscribe the current subscriber.
unsubscribe/3Unsubscribe the current subscriber.
unsubscribe/4Unsubscribe the current subscriber.

Function Details

connect_group_coordinator/3

connect_group_coordinator(BootstrapEndpoints::[endpoint()], ConnCfg::conn_config(), GroupId::group_id()) -> {ok, pid()} | {error, any()}

Connect to consumer group coordinator broker.

Done in steps:
  1. Connect to any of the given bootstrap ednpoints
  2. Send group_coordinator_request to resolve group coordinator endpoint
  3. Connect to the resolved endpoint and return the connection pid

connect_leader/4

connect_leader(Hosts::[endpoint()], Topic::topic(), Partition::partition(), ConnConfig::conn_config()) -> {ok, pid()}

Connect partition leader.

consume_ack/2

consume_ack(ConsumerPid::pid(), Offset::offset()) -> ok | {error, any()}

consume_ack/4

consume_ack(Client::client(), Topic::topic(), Partition::partition(), Offset::offset()) -> ok | {error, any()}

create_topics/3

create_topics(Hosts::[endpoint()], TopicConfigs::[topic_config()], RequestConfigs::#{timeout => kpro:int32(), validate_only => boolean()}) -> ok | {ok, kpro:struct()} | {error, any()}

Equivalent to create_topics(Hosts, TopicsConfigs, RequestConfigs, []).

create_topics/4

create_topics(Hosts::[endpoint()], TopicConfigs::[topic_config()], RequestConfigs::#{timeout => kpro:int32(), validate_only => boolean()}, Options::conn_config()) -> ok | {ok, kpro:struct()} | {error, any()}

Create topic(s) in kafka Return the message body of create_topics, response. See kpro_schema.erl for struct details

delete_topics/3

delete_topics(Hosts::[endpoint()], Topics::[topic()], Timeout::pos_integer()) -> ok | {ok, kpro:struct()} | {error, any()}

Equivalent to delete_topics(Hosts, Topics, Timeout, []).

delete_topics/4

delete_topics(Hosts::[endpoint()], Topics::[topic()], Timeout::pos_integer(), Options::conn_config()) -> ok | {ok, kpro:struct()} | {error, any()}

Delete topic(s) from kafka Return the message body of delete_topics, response. See kpro_schema.erl for struct details

describe_groups/3

describe_groups(CoordinatorEndpoint::endpoint(), ConnCfg::conn_config(), IDs::[group_id()]) -> {ok, [kpro:struct()]} | {error, any()}

Describe consumer groups. The given consumer group IDs should be all managed by the coordinator-broker running at the given endpoint. Otherwise error codes will be returned in the result structs. Return describe_groups response body field named groups. See kpro_schema.erl for struct details

fetch/4

fetch(ConnOrBootstrap::connection() | client_id() | bootstrap(), Topic::topic(), Partition::partition(), Offset::integer()) -> {ok, {HwOffset::offset(), [message()]}} | {error, any()}

Fetch a single message set from the given topic-partition. The first arg can either be an already established connection to leader, or {Endpoints, ConnConfig} so to establish a new connection before fetch.

fetch/5

fetch(ConnOrBootstrap::connection() | client_id() | bootstrap(), Topic::topic(), Partition::partition(), Offset::offset(), Opts::fetch_opts()) -> {ok, {HwOffset::offset(), [message()]}} | {error, any()}

Fetch a single message set from the given topic-partition. The first arg can either be an already established connection to leader, or {Endpoints, ConnConfig} so to establish a new connection before fetch.

fetch/7

fetch(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Offset::offset(), MaxWaitTime::non_neg_integer(), MinBytes::non_neg_integer(), MaxBytes::pos_integer()) -> {ok, [message()]} | {error, any()}

This function is deprecated: fetch(Hosts, Topic, Partition, Offset, Wait, MinBytes, MaxBytes, [])

fetch/8

fetch(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Offset::offset(), MaxWaitTime::non_neg_integer(), MinBytes::non_neg_integer(), MaxBytes::pos_integer(), ConnConfig::conn_config()) -> {ok, [message()]} | {error, any()}

This function is deprecated: Fetch a single message set from the given topic-partition.

fetch_committed_offsets/2

fetch_committed_offsets(Client::client(), GroupId::group_id()) -> {ok, [kpro:struct()]} | {error, any()}

Same as fetch_committed_offsets/3, but works with a started brod_client

fetch_committed_offsets/3

fetch_committed_offsets(BootstrapEndpoints::[endpoint()], ConnCfg::conn_config(), GroupId::group_id()) -> {ok, [kpro:struct()]} | {error, any()}

Fetch committed offsets for ALL topics in the given consumer group. Return the responses field of the offset_fetch response. See kpro_schema.erl for struct details.

fold/8

fold(Bootstrap::connection() | client_id() | bootstrap(), Topic::topic(), Partition::partition(), Offset::offset(), Opts::fetch_opts(), Acc, Fun::fold_fun(Acc), Limits::fold_limits()) -> fold_result()

Fold through messages in a partition. Works like lists:foldl/2 but with below stop conditions: * Always return after reach high watermark offset * Return after the given message count limit is reached * Return after the given kafka offset is reached. * Return if the FoldFun returns an {error, Reason} tuple. NOTE: Exceptions from evaluating FoldFun are not caught.

get_consumer/3

get_consumer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, Reason}

get_metadata/1

get_metadata(Hosts::[endpoint()]) -> {ok, kpro:struct()} | {error, any()}

Fetch broker metadata Return the message body of metadata response. See kpro_schema.erl for details

get_metadata/2

get_metadata(Hosts::[endpoint()], Topics::all | [topic()]) -> {ok, kpro:struct()} | {error, any()}

Fetch broker/topic metadata Return the message body of metadata response. See kpro_schema.erl for struct details

get_metadata/3

get_metadata(Hosts::[endpoint()], Topics::all | [topic()], Options::conn_config()) -> {ok, kpro:struct()} | {error, any()}

Fetch broker/topic metadata Return the message body of metadata response. See kpro_schema.erl for struct details

get_partitions_count/2

get_partitions_count(Client::client(), Topic::topic()) -> {ok, pos_integer()} | {error, any()}

Get number of partitions for a given topic. The higher level producers may need the partition numbers to find the partition producer pid --- if the number of partitions is not statically configured for them. It is up to the callers how they want to distribute their data (e.g. random, roundrobin or consistent-hashing) to the partitions.

get_producer/3

get_producer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, Reason}

Equivalent to brod_client:get_producer / 3.

list_all_groups/2

list_all_groups(Endpoints::[endpoint()], ConnCfg::conn_config()) -> [{endpoint(), [cg()] | {error, any()}}]

List ALL consumer groups in the given kafka cluster. NOTE: Exception if failed to connect any of the coordinator brokers.

list_groups/2

list_groups(CoordinatorEndpoint::endpoint(), ConnCfg::conn_config()) -> {ok, [cg()]} | {error, any()}

List consumer groups in the given group coordinator broker.

main/1

main(X) -> any()

produce/2

produce(Pid::pid(), Value::value()) -> {ok, call_ref()} | {error, any()}

Equivalent to produce(Pid, 0, <<>>, Value).

produce/3

produce(ProducerPid::pid(), Key::key(), Value::value()) -> {ok, call_ref()} | {error, any()}

Produce one message if Value is a binary or an iolist. Otherwise send a batch, if Value is a (nested) key-value list, or a list of maps. In this case Key is discarded (only the keys in the key-value list are sent to Kafka). The pid should be a partition producer pid, NOT client pid. The return value is a call reference of type call_ref(), so the caller can use it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} message after the produce request has been acked by Kafka.

produce/5

produce(Client::client(), Topic::topic(), Partition::partition() | partitioner(), Key::key(), Value::value()) -> {ok, call_ref()} | {error, any()}

Produce one message if Value is a binary or an iolist. Otherwise send a batch if Value is a (nested) key-value list, or a list of maps. In this case Key is used only for partitioning, or discarded if the 3rd argument is a partition number instead of a partitioner callback. This function first looks up the producer pid, then calls produce/3 to do the real work. The return value is a call reference of type call_ref(), so the caller can used it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} message after the produce request has been acked by Kafka.

produce_cb/4

produce_cb(ProducerPid::pid(), Key::key(), Value::value(), AckCb::produce_ack_cb()) -> ok | {error, any()}

Same as produce/3, only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka.

produce_cb/6

produce_cb(Client::client(), Topic::topic(), Part::partition() | partitioner(), Key::key(), Value::value(), AckCb::produce_ack_cb()) -> ok | {ok, partition()} | {error, any()}

Same as produce/5 only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka. Return the partition to caller as {ok, Partition} for caller to correlate the callback when the 3rd arg is not a partition number.

produce_no_ack/5

produce_no_ack(Client::client(), Topic::topic(), Part::partition() | partitioner(), Key::key(), Value::value()) -> ok | {error, any()}

Find the partition worker and send message without any ack. NOTE: This call has no back-pressure to the caller, excessive usage may cause BEAM to run out of memory.

produce_sync/2

produce_sync(Pid::pid(), Value::value()) -> ok

Equivalent to produce_sync(Pid, 0, <<>>, Value).

Same as produce/5 only the ack is not d

produce_sync/3

produce_sync(Pid::pid(), Key::key(), Value::value()) -> ok | {error, any()}

Sync version of produce/3

This function will not return until the response is received from Kafka. But when producer is started with required_acks set to 0, this function will return once the messages are buffered in the producer process.

produce_sync/5

produce_sync(Client::client(), Topic::topic(), Partition::partition() | partitioner(), Key::key(), Value::value()) -> ok | {error, any()}

Sync version of produce/5 This function will not return until a response is received from kafka, however if producer is started with required_acks set to 0, this function will return once the messages are buffered in the producer process.

produce_sync_offset/5

produce_sync_offset(Client::client(), Topic::topic(), Partition::partition() | partitioner(), Key::key(), Value::value()) -> {ok, offset()} | {error, any()}

Version of produce_sync/5 that returns the offset assigned by Kafka If producer is started with required_acks set to 0, the offset will be ?BROD_PRODUCE_UNKNOWN_OFFSET.

resolve_offset/3

resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition()) -> {ok, offset()} | {error, any()}

Equivalent to resolve_offset(Hosts, Topic, Partition, latest, 1).

resolve_offset/4

resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Time::offset_time()) -> {ok, offset()} | {error, any()}

Resolve semantic offset or timestamp to real offset.

resolve_offset/5

resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Time::offset_time(), ConnCfg::conn_config()) -> {ok, offset()} | {error, any()}

Resolve semantic offset or timestamp to real offset.

resolve_offset/6

resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Time::offset_time(), ConnCfg::conn_config(), Opts::#{timeout => kpro:int32()}) -> {ok, offset()} | {error, any()}

Resolve semantic offset or timestamp to real offset.

start/0

start() -> ok | no_return()

Start brod application.

start/2

start(StartType, StartArgs) -> any()

Application behaviour callback

start_client/1

start_client(BootstrapEndpoints::[endpoint()]) -> ok | {error, any()}

Equivalent to start_client(BootstrapEndpoints, brod_default_client).

start_client/2

start_client(BootstrapEndpoints::[endpoint()], ClientId::client_id()) -> ok | {error, any()}

Equivalent to start_client(BootstrapEndpoints, ClientId, []).

start_client/3

start_client(BootstrapEndpoints::[endpoint()], ClientId::client_id(), Config::client_config()) -> ok | {error, any()}

Start a client.

BootstrapEndpoints: Kafka cluster endpoints, can be any of the brokers in the cluster, which does not necessarily have to be the leader of any partition, e.g. a load-balanced entrypoint to the remote Kafka cluster.

ClientId: Atom to identify the client process.

Config is a proplist, possible values:

Connection options can be added to the same proplist. See kpro_connection.erl in kafka_protocol for the details:

start_consumer/3

start_consumer(Client::client(), TopicName::topic(), ConsumerConfig::consumer_config()) -> ok | {error, any()}

Dynamically start a topic consumer.

See also: for details about consumer config..

start_link_client/1

start_link_client(BootstrapEndpoints::[endpoint()]) -> {ok, pid()} | {error, any()}

Equivalent to start_link_client(BootstrapEndpoints, brod_default_client).

start_link_client/2

start_link_client(BootstrapEndpoints::[endpoint()], ClientId::client_id()) -> {ok, pid()} | {error, any()}

Equivalent to start_link_client(BootstrapEndpoints, ClientId, []).

start_link_client/3

start_link_client(BootstrapEndpoints::[endpoint()], ClientId::client_id(), Config::client_config()) -> {ok, pid()} | {error, any()}

start_link_group_subscriber/7

start_link_group_subscriber(Client::client(), GroupId::group_id(), Topics::[topic()], GroupConfig::group_config(), ConsumerConfig::consumer_config(), CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}

See also: brod_group_subscriber:start_link/7.

start_link_group_subscriber/8

start_link_group_subscriber(Client::client(), GroupId::group_id(), Topics::[topic()], GroupConfig::group_config(), ConsumerConfig::consumer_config(), MessageType::message | message_set, CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}

See also: brod_group_subscriber:start_link/8.

start_link_group_subscriber_v2/1

start_link_group_subscriber_v2(Config::brod_group_subscriber_v2:subscriber_config()) -> {ok, pid()} | {error, any()}

Start group_subscriber_v2

start_link_topic_subscriber/1

start_link_topic_subscriber(Config::brod_topic_subscriber:topic_subscriber_config()) -> {ok, pid()} | {error, any()}

See also: brod_topic_subscriber:start_link/1.

start_link_topic_subscriber/5

start_link_topic_subscriber(Client::client(), Topic::topic(), ConsumerConfig::consumer_config(), CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}

Equivalent to start_link_topic_subscriber(Client, Topic, all, ConsumerConfig, CbModule, CbInitArg).

This function is deprecated: Please use start_link_topic_subscriber/1 instead

start_link_topic_subscriber/6

start_link_topic_subscriber(Client::client(), Topic::topic(), Partitions::all | [partition()], ConsumerConfig::consumer_config(), CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}

Equivalent to start_link_topic_subscriber(Client, Topic, Partitions, ConsumerConfig, message, CbModule, CbInitArg).

This function is deprecated: Please use start_link_topic_subscriber/1 instead

start_link_topic_subscriber/7

start_link_topic_subscriber(Client::client(), Topic::topic(), Partitions::all | [partition()], ConsumerConfig::consumer_config(), MessageType::message | message_set, CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}

This function is deprecated: Please use start_link_topic_subscriber/1 instead

See also: brod_topic_subscriber:start_link/7.

start_producer/3

start_producer(Client::client(), TopicName::topic(), ProducerConfig::producer_config()) -> ok | {error, any()}

Dynamically start a per-topic producer.

See also: brod_producer:start_link/4.

stop/0

stop() -> ok

Stop brod application.

stop/1

stop(State) -> any()

Application behaviour callback

stop_client/1

stop_client(Client::client()) -> ok

Stop a client.

subscribe/3

subscribe(ConsumerPid::pid(), SubscriberPid::pid(), Options::consumer_options()) -> ok | {error, any()}

subscribe/5

subscribe(Client::client(), SubscriberPid::pid(), Topic::topic(), Partition::partition(), Options::consumer_options()) -> {ok, pid()} | {error, any()}

Subscribe to a data stream from the given topic-partition.

If {error, Reason} is returned, the caller should perhaps retry later.

{ok, ConsumerPid} is returned on success. The caller may want to monitor the consumer pid and re-subscribe should the ConsumerPid crash.

Upon successful subscription the subscriber process should expect messages of pattern: {ConsumerPid, #kafka_message_set{}} and {ConsumerPid, #kafka_fetch_error{}}.

-include_lib("brod/include/brod.hrl") to access the records.

In case #kafka_fetch_error{} is received the subscriber should re-subscribe itself to resume the data stream.

sync_produce_request/1

sync_produce_request(CallRef::call_ref()) -> ok | {error, Reason::any()}

Block wait for sent produced request to be acked by kafka.

sync_produce_request/2

sync_produce_request(CallRef::call_ref(), Timeout::timeout()) -> ok | {error, Reason::any()}

sync_produce_request_offset/1

sync_produce_request_offset(CallRef::call_ref()) -> {ok, offset()} | {error, Reason::any()}

As sync_produce_request_offset/1, but also returning assigned offset See produce_sync_offset/5.

sync_produce_request_offset/2

sync_produce_request_offset(CallRef::call_ref(), Timeout::timeout()) -> {ok, offset()} | {error, Reason::any()}

unsubscribe/1

unsubscribe(ConsumerPid::pid()) -> ok | {error, any()}

Unsubscribe the current subscriber. Assuming the subscriber is self().

unsubscribe/2

unsubscribe(ConsumerPid::pid(), SubscriberPid::pid()) -> ok | {error, any()}

Unsubscribe the current subscriber.

unsubscribe/3

unsubscribe(Client::client(), Topic::topic(), Partition::partition()) -> ok | {error, any()}

Unsubscribe the current subscriber. Assuming the subscriber is self().

unsubscribe/4

unsubscribe(Client::client(), Topic::topic(), Partition::partition(), SubscriberPid::pid()) -> ok | {error, any()}

Unsubscribe the current subscriber.


Generated by EDoc