View Source brod (brod v4.3.3)
Summary
Types
Connection configuration that will be passed to kpro
calls.
Consumer configuration.
fold
always returns when reaches the high watermark offset. fold
also returns when any of the limits is hit.A record with topic, partition, high_wm_offset (max offset of the partition), and messages.
A record with call_ref, base_offset, and result.
Functions
See also: brod_transaction:abort/1.
See also: brod_transaction:commit/1.
Connect to consumer group coordinator broker.
Acknowledge that one or more messages have been processed.
Equivalent to create_topics(Hosts, TopicsConfigs, RequestConfigs, []).
Create topic(s) in kafka.
Delete topic(s) from kafka.
Describe consumer groups.
Fetch a single message set from the given topic-partition.
Fetch a single message set from the given topic-partition.
brod_client
Fetch committed offsets for ALL topics in the given consumer group.
Fold through messages in a partition.
Fetch broker metadata for all topics.
Fetch broker metadata for the given topics.
Fetch broker metadata for the given topics using the given connection options.
Get number of partitions for a given topic.
get_partitions_count(Client, Topic)
but ensured not to auto-create topics in Kafka even when Kafka has topic auto-creation configured.Equivalent to brod_client:get_producer(Client, Topic, Partition).
List ALL consumer groups in the given kafka cluster.
Equivalent to produce(Pid, <<>>, Value).
Produce one or more messages.
Produce one or more messages.
produce/3
, only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka (see the produce_ack_cb()
type).Same as produce/5
only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka (see the produce_ack_cb()
type).
Find the partition worker and send message without any ack.
Equivalent to produce_sync(Pid, <<>>, Value).
Sync version of produce/3
.
Sync version of produce/5
.
Version of produce_sync/5
that returns the offset assigned by Kafka.
Equivalent to resolve_offset(Hosts, Topic, Partition, latest, []).
Equivalent to resolve_offset(Hosts, Topic, Partition, Time, []).
Resolve semantic offset or timestamp to real offset.
Resolve semantic offset or timestamp to real offset.
Equivalent to start_client(BootstrapEndpoints, brod_default_client).
Equivalent to start_client(BootstrapEndpoints, ClientId, []).
Start a client (brod_client
).
Dynamically start topic consumer(s) and register it in the client.
Equivalent to start_link_client(BootstrapEndpoints, brod_default_client).
Equivalent to start_link_client(BootstrapEndpoints, ClientId, []).
See also: brod_group_subscriber:start_link/7.
See also: brod_group_subscriber:start_link/8.
See also: brod_topic_subscriber:start_link/7.
Dynamically start a per-topic producer and register it in the client.
Subscribe to a data stream from the given consumer.
Subscribe to a data stream from the given topic-partition.
Block wait for sent produced request to be acked by kafka.
As sync_produce_request/2
, but also returning assigned offset.
Equivalent to brod_transaction:start_link / 3.
See also: brod_transaction:add_offsets/3.
See also: brod_transaction_processor:do/3.
See also: brod_transaction:produce/5.
See also: brod_transaction:produce/5.
Unsubscribe the current subscriber.
Unsubscribe the current subscriber.
Types
-type batch_input() :: [msg_input()].
-type bootstrap() :: [endpoint()] | {[endpoint()], client_config()}.
-type cg() :: #brod_cg{id :: brod:group_id(), protocol_type :: brod:cg_protocol_type()}.
-type cg_protocol_type() :: binary().
-type client_config() :: brod_client:config().
-type client_id() :: atom().
-type compression() :: no_compression | gzip | snappy.
-type conn_config() :: [{atom(), term()}] | kpro:conn_config().
Connection configuration that will be passed to kpro
calls.
kpro_connection:config()
type.
-type connection() :: kpro:connection().
-type consumer_config() :: [{begin_offset, offset_time()} | {min_bytes, non_neg_integer()} | {max_bytes, non_neg_integer()} | {max_wait_time, integer()} | {sleep_timeout, integer()} | {prefetch_count, integer()} | {prefetch_bytes, non_neg_integer()} | {offset_reset_policy, brod_consumer:offset_reset_policy()} | {size_stat_window, non_neg_integer()} | {isolation_level, brod_consumer:isolation_level()} | {share_leader_conn, boolean()}].
Consumer configuration.
The meaning of the options is documented atbrod_consumer:start_link/5
.
-type error_code() :: kpro:error_code().
-type fetch_opts() :: kpro:fetch_opts().
-type fold_acc() :: term().
fold
always returns when reaches the high watermark offset. fold
also returns when any of the limits is hit.
-type fold_limits() :: #{message_count => pos_integer(), reach_offset => offset()}.
-type fold_result() :: {fold_acc(), OffsetToContinue :: offset(), fold_stop_reason()}.
-type fold_stop_reason() :: reached_end_of_partition | reached_message_count_limit | reached_target_offset | {error, any()}.
-type group_config() :: proplists:proplist().
-type group_generation_id() :: non_neg_integer().
-type group_id() :: kpro:group_id().
-type group_member() :: {group_member_id(), #kafka_group_member_metadata{version :: non_neg_integer(), topics :: [brod:topic()], user_data :: binary()}}.
-type group_member_id() :: binary().
-type hostname() :: kpro:hostname().
-type key() :: undefined | binary().
-type message() :: kpro:message().
-type message_set() :: #kafka_message_set{topic :: brod:topic(), partition :: brod:partition(), high_wm_offset :: integer(), messages :: [brod:message()] | kpro:incomplete_batch()}.
A record with topic, partition, high_wm_offset (max offset of the partition), and messages.
See the definition for more information.-type msg_input() :: kpro:msg_input().
-type msg_ts() :: kpro:msg_ts().
-type offset() :: kpro:offset().
-type offset_time() :: msg_ts() | earliest | latest.
-type offsets_to_commit() :: kpro:offsets_to_commit().
-type partition() :: kpro:partition().
-type partition_fun() :: fun((topic(), pos_integer(), key(), value()) -> {ok, partition()}).
-type partitioner() :: partition_fun() | random | hash.
-type portnum() :: pos_integer().
-type produce_reply() :: #brod_produce_reply{call_ref :: brod:call_ref(), base_offset :: undefined | brod:offset(), result :: brod:produce_result()}.
A record with call_ref, base_offset, and result.
See the the definition for more information.-type produce_result() :: brod_produce_req_buffered | brod_produce_req_acked.
-type producer_config() :: brod_producer:config().
-type received_assignments() :: [#brod_received_assignment{topic :: brod:topic(), partition :: brod:partition(), begin_offset :: undefined | brod:offset() | {begin_offset, brod:offset_time()}}].
-type topic() :: kpro:topic().
-type topic_config() :: kpro:struct().
-type transaction() :: brod_transaction:transaction().
-type transaction_config() :: brod_transaction:transaction_config().
-type transactional_id() :: brod_transaction:transactional_id().
-type txn_do_options() :: brod_transaction_processor:do_options().
-type txn_function() :: brod_transaction_processor:process_function().
-type value() :: undefined | iodata() | {msg_ts(), binary()} | [{key(), value()}] | [{msg_ts(), key(), value()}] | kpro:msg_input() | kpro:batch_input().
Functions
-spec abort(transaction()) -> ok | {error, any()}.
See also: brod_transaction:abort/1.
-spec commit(transaction()) -> ok | {error, any()}.
See also: brod_transaction:commit/1.
-spec connect_group_coordinator([endpoint()], conn_config(), group_id()) -> {ok, pid()} | {error, any()}.
Connect to consumer group coordinator broker.
Done in steps:- Connect to any of the given bootstrap ednpoints
- Send group_coordinator_request to resolve group coordinator endpoint
- Connect to the resolved endpoint and return the connection pid
-spec connect_leader([endpoint()], topic(), partition(), conn_config()) -> {ok, pid()}.
Equivalent to brod_consumer:ack(ConsumerPid, Offset).
Seeconsume_ack/4
for more information.
Acknowledge that one or more messages have been processed.
brod_consumer
sends message-sets to the subscriber process, and keep the messages in a 'pending' queue. The subscriber may choose to ack any received offset. Acknowledging a greater offset will automatically acknowledge the messages before this offset. For example, if message [1, 2, 3, 4]
have been sent to (as one or more message-sets) to the subscriber, the subscriber may acknowledge with offset 3
to indicate that the first three messages are successfully processed, leaving behind only message 4
pending.
The 'pending' queue has a size limit (see prefetch_count
consumer config) which is to provide a mechanism to handle back-pressure. If there are too many messages pending on ack, the consumer will stop fetching new ones so the subscriber won't get overwhelmed.
Note, there is no range check done for the acknowledging offset, meaning if offset [M, N]
are pending to be acknowledged, acknowledging with Offset > N
will cause all offsets to be removed from the pending queue, and acknowledging with Offset < M
has no effect.
subscribe/5
). Behaviours like brod_topic_subscriber
have their own way how to ack messages.
-spec create_topics([endpoint()], [topic_config()], #{timeout => kpro:int32()}) -> ok | {error, any()}.
Equivalent to create_topics(Hosts, TopicsConfigs, RequestConfigs, []).
-spec create_topics([endpoint()], [topic_config()], #{timeout => kpro:int32()}, conn_config()) -> ok | {error, any()}.
Create topic(s) in kafka.
TopicConfigs
is a list of topic configurations. A topic configuration is a map (or tuple list for backward compatibility) with the following keys (all of them are reuired):
The topic name.name
The number of partitions to create in the topic, or -1 if we are either specifying a manual partition assignment or using the default partitions.num_partitions
The number of replicas to create for each partition in the topic, or -1 if we are either specifying a manual partition assignment or using the default replication factor.replication_factor
The manual partition assignment, or the empty list if we let Kafka automatically assign them. It is a list of maps (or tuple lists) with the following keys:assignments
partition_index
andbroker_ids
(a list of of brokers to place the partition on).
The custom topic configurations to set. It is a list of of maps (or tuple lists) with keysconfigs
name
andvalue
. You can find possible options in the Kafka documentation.
> TopicConfigs = [
#{
name => <<"my_topic">>,
num_partitions => 1,
replication_factor => 1,
assignments => [],
configs => [ #{name => <<"cleanup.policy">>, value => "compact"}]
}
].
> brod:create_topics([{"localhost", 9092}], TopicConfigs, #{timeout => 1000}, []).
ok
-spec delete_topics([endpoint()], [topic()], pos_integer()) -> ok | {error, any()}.
Equivalent to delete_topics(Hosts, Topics, Timeout, []).
-spec delete_topics([endpoint()], [topic()], pos_integer(), conn_config()) -> ok | {error, any()}.
Delete topic(s) from kafka.
Example: > brod:delete_topics([{"localhost", 9092}], ["my_topic"], 5000, []).
ok
-spec describe_groups(endpoint(), conn_config(), [group_id()]) -> {ok, [kpro:struct()]} | {error, any()}.
Describe consumer groups.
The given consumer group IDs should be all managed by the coordinator-broker running at the given endpoint. Otherwise error codes will be returned in the result structs. Returndescribe_groups
response body field named groups
. See kpro_schema.erl
for struct details.
-spec fetch(connection() | client_id() | bootstrap(), topic(), partition(), integer()) -> {ok, {HwOffset :: offset(), [message()]}} | {error, any()}.
Fetch a single message set from the given topic-partition.
Calls fetch/5
with the default options: max_wait_time
= 1 second, min_bytes
= 1 B, and max_bytes
= 2^20 B (1 MB).
fetch/5
for more information.
-spec fetch(connection() | client_id() | bootstrap(), topic(), partition(), offset(), fetch_opts()) -> {ok, {HwOffset :: offset(), [message()]}} | {error, any()}.
Fetch a single message set from the given topic-partition.
The first arg can either be an already established connection to leader, or {Endpoints, ConnConfig}
(or just Endpoints
) so to establish a new connection before fetch.
The fourth argument is the start offset of the query. Messages with offset greater or equal will be fetched.
You can also pass options for the fetch query. See the kpro_req_lib:fetch_opts()
type for their documentation. Only max_wait_time
, min_bytes
, max_bytes
, and isolation_level
options are currently supported. The defaults are the same as documented in the linked type, except for min_bytes
which defaults to 1 in brod
. Note that max_bytes
will be rounded up so that full messages are retrieved. For example, if you specify max_bytes = 42
and there are three messages of size 40 bytes, two of them will be fetched.
On success, the function returns the messages along with the last stable offset (when using read_committed
mode, the last committed offset) or the high watermark offset (offset of the last message that was successfully copied to all replicas, incremented by 1), whichever is lower. In essence, this is the offset up to which it was possible to read the messages at the time of fetching. This is similar to what resolve_offset/6
with latest
returns. You can use this information to determine how far from the end of the topic you currently are. Note that when you use this offset as the start offset for a subseuqent call, an empty list of messages will be returned (assuming the topic hasn't changed, e.g. no new message arrived). Only when you use an offset greater than this one, {error, offset_out_of_range}
will be returned.
Note also that Kafka batches messages in a message set only up to the end of a topic segment in which the first retrieved message is, so there may actually be more messages behind the last fetched offset even if the fetched size is significantly less than max_bytes
provided in fetch_opts()
. See this issue for more details.
> brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 0, #{max_bytes => 1024}).
{ok,{2,
[{kafka_message,0,<<"some_key">>,<<"Hello world!">>,
create,1663940976473,[]},
{kafka_message,1,<<"another_key">>,<<"This is a message with offset 1.">>,
create,1663940996335,[]}]}}
> brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 2, #{max_bytes => 1024}).
{ok,{2,[]}}
> brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 3, #{max_bytes => 1024}).
{error,offset_out_of_range}
fetch(Hosts, Topic, Partition, Offset, MaxWaitTime, MinBytes, MaxBytes)
View Source-spec fetch([endpoint()], topic(), partition(), offset(), non_neg_integer(), non_neg_integer(), pos_integer()) -> {ok, [message()]} | {error, any()}.
Equivalent to fetch(Hosts, Topic, Partition, Offset, Wait, MinBytes, MaxBytes, []).
fetch(Hosts, Topic, Partition, Offset, MaxWaitTime, MinBytes, MaxBytes, ConnConfig)
View Source-spec fetch([endpoint()], topic(), partition(), offset(), non_neg_integer(), non_neg_integer(), pos_integer(), conn_config()) -> {ok, [message()]} | {error, any()}.
-spec fetch_committed_offsets(client(), group_id()) -> {ok, [kpro:struct()]} | {error, any()}.
brod_client
-spec fetch_committed_offsets([endpoint()], conn_config(), group_id()) -> {ok, [kpro:struct()]} | {error, any()}.
Fetch committed offsets for ALL topics in the given consumer group.
Return theresponses
field of the offset_fetch
response. See kpro_schema.erl
for struct details.
-spec fold(connection() | client_id() | bootstrap(), topic(), partition(), offset(), fetch_opts(), Acc, fold_fun(Acc), fold_limits()) -> fold_result() when Acc :: fold_acc().
Fold through messages in a partition.
Works likelists:foldl/2
but with below stop conditions:- Always return after reach high watermark offset
- Return after the given message count limit is reached
- Return after the given kafka offset is reached
- Return if the
FoldFun
returns an{error, Reason}
tuple
FoldFun
are not caught.
-spec get_metadata([endpoint()]) -> {ok, kpro:struct()} | {error, any()}.
Fetch broker metadata for all topics.
Seeget_metadata/3
for more information.
-spec get_metadata([endpoint()], all | [topic()]) -> {ok, kpro:struct()} | {error, any()}.
Fetch broker metadata for the given topics.
Seeget_metadata/3
for more information.
-spec get_metadata([endpoint()], all | [topic()], conn_config()) -> {ok, kpro:struct()} | {error, any()}.
Fetch broker metadata for the given topics using the given connection options.
The response differs in each version of the Metadata
API call. The last supported Metadata
API version is 2, so this will be probably used (if your Kafka supports it too). See kafka.bnf (search for MetadataResponseV2
) for response schema with comments.
Beware that when auto.create.topics.enable
is set to true in the broker configuration, fetching metadata with a concrete topic specified (in the Topics
parameter) may cause creation of the topic when it does not exist. If you want a safe get_metadata
call, always pass all
as Topics
and then filter them.
> brod:get_metadata([{"localhost", 9092}], [<<"my_topic">>], []).
{ok,#{brokers =>
[#{host => <<"localhost">>,node_id => 1,port => 9092,
rack => <<>>}],
cluster_id => <<"jTb2faMLRf6p21yD1y3v-A">>,
controller_id => 1,
topics =>
[#{error_code => no_error,is_internal => false,
name => <<"my_topic">>,
partitions =>
[#{error_code => no_error,
isr_nodes => [1],
leader_id => 1,partition_index => 1,
replica_nodes => [1]},
#{error_code => no_error,
isr_nodes => [1],
leader_id => 1,partition_index => 0,
replica_nodes => [1]}]}]}}
-spec get_partitions_count(client(), topic()) -> {ok, pos_integer()} | {error, any()}.
Get number of partitions for a given topic.
The higher level producers may need the partition numbers to find the partition producer pid – if the number of partitions is not statically configured for them. It is up to the callers how they want to distribute their data (e.g. random, roundrobin or consistent-hashing) to the partitions. NOTE: The partitions count is cached.-spec get_partitions_count_safe(client(), topic()) -> {ok, pos_integer()} | {error, any()}.
get_partitions_count(Client, Topic)
but ensured not to auto-create topics in Kafka even when Kafka has topic auto-creation configured.
-spec get_producer(client(), topic(), partition()) -> {ok, pid()} | {error, Reason} when Reason :: client_down | {client_down, any()} | {producer_down, any()} | {producer_not_found, topic()} | {producer_not_found, topic(), partition()}.
Equivalent to brod_client:get_producer(Client, Topic, Partition).
-spec list_all_groups([endpoint()], conn_config()) -> [{endpoint(), [cg()] | {error, any()}}].
List ALL consumer groups in the given kafka cluster.
NOTE: Exception if failed to connect any of the coordinator brokers.-spec list_groups(endpoint(), conn_config()) -> {ok, [cg()]} | {error, any()}.
Equivalent to produce(Pid, <<>>, Value).
Produce one or more messages.
See produce/5
for information about possible shapes of Value
.
The pid should be a partition producer pid, NOT client pid.
The return value is a call reference of typecall_ref()
, so the caller can use it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked}
message after the produce request has been acked by Kafka.
-spec produce(client(), topic(), partition() | partitioner(), key(), value()) -> {ok, call_ref()} | {error, any()}.
Produce one or more messages.
Value
can have many different forms:binary()
: Single message with key from theKey
argument{brod:msg_ts(), binary()}
: Single message with its create-time timestamp and key fromKey
#{ts => brod:msg_ts(), value => binary(), headers => [{_, _}]}
: Single message; if this map does not have akey
field,Key
is used instead[{K, V} | {T, K, V}]
: A batch, whereV
could be a nested list of such representation[#{key => K, value => V, ts => T, headers => [{_, _}]}]
: A batch
When Value
is a batch, the Key
argument is only used as partitioner input and all messages are written on the same partition.
ts
field is dropped for kafka prior to version 0.10
(produce API version 0, magic version 0). headers
field is dropped for kafka prior to version 0.11
(produce API version 0-2, magic version 0-1).
Partition
may be either a concrete partition (an integer) or a partitioner (see partitioner()
for more info).
A producer for the particular topic has to be already started (by calling start_producer/3
), unless you have specified auto_start_producers = true
when starting the client.
This function first looks up the producer pid, then calls produce/3
to do the real work.
The return value is a call reference of type call_ref()
, so the caller can used it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked}
(see the produce_reply()
type) message after the produce request has been acked by Kafka.
> brod:produce(my_client, <<"my_topic">>, 0, "key", <<"Hello from erlang!">>).
{ok,{brod_call_ref,<0.83.0>,<0.133.0>,#Ref<0.3024768151.2556690436.92841>}}
> flush().
Shell got {brod_produce_reply,
{brod_call_ref,<0.83.0>,<0.133.0>,
#Ref<0.3024768151.2556690436.92841>},
12,brod_produce_req_acked}
-spec produce_cb(pid(), key(), value(), produce_ack_cb()) -> ok | {error, any()}.
produce/3
, only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka (see the produce_ack_cb()
type).
-spec produce_cb(client(), topic(), partition() | partitioner(), key(), value(), produce_ack_cb()) -> ok | {ok, partition()} | {error, any()}.
Same as produce/5
only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka (see the produce_ack_cb()
type).
{ok, Partition}
for caller to correlate the callback when the 3rd arg is not a partition number.
-spec produce_no_ack(client(), topic(), partition() | partitioner(), key(), value()) -> ok | {error, any()}.
Find the partition worker and send message without any ack.
NOTE: This call has no back-pressure to the caller, excessive usage may cause BEAM to run out of memory.Equivalent to produce_sync(Pid, <<>>, Value).
Sync version of produce/3
.
required_acks
set to 0, this function will return once the messages are buffered in the producer process.
-spec produce_sync(client(), topic(), partition() | partitioner(), key(), value()) -> ok | {error, any()}.
Sync version of produce/5
.
required_acks
set to 0, this function will return once the messages are buffered in the producer process.
-spec produce_sync_offset(client(), topic(), partition() | partitioner(), key(), value()) -> {ok, offset()} | {error, any()}.
Version of produce_sync/5
that returns the offset assigned by Kafka.
required_acks
set to 0, the offset will be ?BROD_PRODUCE_UNKNOWN_OFFSET
.
Equivalent to resolve_offset(Hosts, Topic, Partition, latest, []).
-spec resolve_offset([endpoint()], topic(), partition(), offset_time()) -> {ok, offset()} | {error, any()}.
Equivalent to resolve_offset(Hosts, Topic, Partition, Time, []).
-spec resolve_offset([endpoint()], topic(), partition(), offset_time(), conn_config()) -> {ok, offset()} | {error, any()}.
Resolve semantic offset or timestamp to real offset.
The same asresolve_offset/6
but the timeout is extracted from connection config.
-spec resolve_offset([endpoint()], topic(), partition(), offset_time(), conn_config(), #{timeout => kpro:int32()}) -> {ok, offset()} | {error, any()}.
Resolve semantic offset or timestamp to real offset.
The function returns the offset of the first message with the given timestamp, or of the first message after the given timestamp (in case no message matches the timestamp exactly), or -1 if the timestamp is newer than (>) all messages in the topic.
You can also use two semantic offsets instead of a timestamp: earliest
gives you the offset of the first message in the topic and latest
gives you the offset of the last message incremented by 1.
If the topic is empty, both earliest
and latest
return the same value (which is 0 unless some messages were deleted from the topic), and any timestamp returns -1.
Messages:
offset 0 1 2 3
timestamp 10 20 20 30
Calls:
resolve_offset(Endpoints, Topic, Partition, 5) → 0
resolve_offset(Endpoints, Topic, Partition, 10) → 0
resolve_offset(Endpoints, Topic, Partition, 13) → 1
resolve_offset(Endpoints, Topic, Partition, 20) → 1
resolve_offset(Endpoints, Topic, Partition, 31) → -1
resolve_offset(Endpoints, Topic, Partition, earliest) → 0
resolve_offset(Endpoints, Topic, Partition, latest) → 4
-spec start() -> ok | no_return().
Equivalent to start_client(BootstrapEndpoints, brod_default_client).
Equivalent to start_client(BootstrapEndpoints, ClientId, []).
-spec start_client([endpoint()], client_id(), client_config()) -> ok | {error, any()}.
Start a client (brod_client
).
BootstrapEndpoints
: Kafka cluster endpoints, can be any of the brokers in the cluster, which does not necessarily have to be the leader of any partition, e.g. a load-balanced entrypoint to the remote Kafka cluster.
ClientId
: Atom to identify the client process.
Config
is a proplist, possible values:
How long to wait between attempts to restart brod_client process when it crashes.restart_delay_seconds
(optional, default=10)
Returnget_metadata_timeout_seconds
(optional, default=5){error, timeout}
frombrod_client:get_xxx
calls if responses for APIs such asmetadata
,find_coordinator
are not received in time.
Delay this configured number of seconds before retrying to establish a new connection to the kafka partition leader.reconnect_cool_down_seconds
(optional, default=1)
By default, brod respects what is configured in the broker about topic auto-creation. i.e. whetherallow_topic_auto_creation
(optional, default=true)auto.create.topics.enable
is set in the broker configuration. However ifallow_topic_auto_creation
is set tofalse
in client config, brod will avoid sending metadata requests that may cause an auto-creation of the topic regardless of what broker config is.
If true, brod client will spawn a producer automatically when user is trying to callauto_start_producers
(optional, default=false)produce
but did not callbrod:start_producer
explicitly. Can be useful for applications which don't know beforehand which topics they will be working with.
Producer configuration to use when auto_start_producers is true. Seedefault_producer_config
(optional, default=[])brod_producer:start_link/4
for details about producer config
For how long unknown_topic error will be cached, in ms.unknown_topic_cache_ttl
(optional, default=120000)
Connection options can be added to the same proplist. See kpro_connection.erl
in kafka_protocol
for the details:
ssl
(optional, default=false)true | false | ssl:ssl_option()
true
is translated to[]
asssl:ssl_option()
i.e. all default.
Credentials for SASL/Plain authentication.sasl
(optional, default=undefined){mechanism(), Filename}
or{mechanism(), UserName, Password}
where mechanism can be atoms:plain
(for "PLAIN"),scram_sha_256
(for "SCRAM-SHA-256") orscram_sha_512
(for SCRAM-SHA-512).Filename
should be a file consisting two lines, first line is the username and the second line is the password. BothUsername
andPassword
should bestring() | binary()
Timeout when trying to connect to an endpoint.connect_timeout
(optional, default=5000)
Timeout when waiting for a response, connection restart when timed out.request_timeout
(optional, default=240000, constraint: >= 1000)
Must be set to false to work with kafka versions prior to 0.10, When set toquery_api_versions
(optional, default=true)true
, at connection start, brod will send a query request to get the broker supported API version ranges. When set to 'false', brod will always use the lowest supported API version when sending requests to kafka. Supported API version ranges can be found in:brod_kafka_apis:supported_versions/1
Extra socket options to tune socket performance. e.g.extra_sock_opts
(optional, default=[])[{sndbuf, 1 bsl 20}]
. More info
-spec start_consumer(client(), topic(), consumer_config()) -> ok | {error, any()}.
Dynamically start topic consumer(s) and register it in the client.
A brod_consumer
is started for each partition of the given topic. Note that you can have only one consumer per client-topic.
See brod_consumer:start_link/5
for details about consumer config.
Equivalent to start_link_client(BootstrapEndpoints, brod_default_client).
Equivalent to start_link_client(BootstrapEndpoints, ClientId, []).
-spec start_link_client([endpoint()], client_id(), client_config()) -> {ok, pid()} | {error, any()}.
start_link_group_subscriber(Client, GroupId, Topics, GroupConfig, ConsumerConfig, CbModule, CbInitArg)
View Source-spec start_link_group_subscriber(client(), group_id(), [topic()], group_config(), consumer_config(), module(), term()) -> {ok, pid()} | {error, any()}.
See also: brod_group_subscriber:start_link/7.
start_link_group_subscriber(Client, GroupId, Topics, GroupConfig, ConsumerConfig, MessageType, CbModule, CbInitArg)
View Source-spec start_link_group_subscriber(client(), group_id(), [topic()], group_config(), consumer_config(), message | message_set, module(), term()) -> {ok, pid()} | {error, any()}.
See also: brod_group_subscriber:start_link/8.
-spec start_link_group_subscriber_v2(brod_group_subscriber_v2:subscriber_config()) -> {ok, pid()} | {error, any()}.
-spec start_link_topic_subscriber(brod_topic_subscriber:topic_subscriber_config()) -> {ok, pid()} | {error, any()}.
See also: brod_topic_subscriber:start_link/1.
start_link_topic_subscriber(Client, Topic, ConsumerConfig, CbModule, CbInitArg)
View Source-spec start_link_topic_subscriber(client(), topic(), consumer_config(), module(), term()) -> {ok, pid()} | {error, any()}.
Equivalent to start_link_topic_subscriber(Client, Topic, all, ConsumerConfig, CbModule, CbInitArg).
start_link_topic_subscriber(Client, Topic, Partitions, ConsumerConfig, CbModule, CbInitArg)
View Source-spec start_link_topic_subscriber(client(), topic(), all | [partition()], consumer_config(), module(), term()) -> {ok, pid()} | {error, any()}.
Equivalent to start_link_topic_subscriber(Client, Topic, Partitions, ConsumerConfig, message, CbModule, CbInitArg).
start_link_topic_subscriber(Client, Topic, Partitions, ConsumerConfig, MessageType, CbModule, CbInitArg)
View Source-spec start_link_topic_subscriber(client(), topic(), all | [partition()], consumer_config(), message | message_set, module(), term()) -> {ok, pid()} | {error, any()}.
See also: brod_topic_subscriber:start_link/7.
-spec start_producer(client(), topic(), producer_config()) -> ok | {error, any()}.
Dynamically start a per-topic producer and register it in the client.
You have to start a producer for each topic you want to produce messages into, unless you have specified auto_start_producers = true
when starting the client (in that case you don't have to call this function at all).
After starting the producer, you can call produce/5
and friends for producing messages.
You can read more about producers in the overview.
A client has to be already started before making this call (e.g. by calling start_client/3
).
See brod_producer:start_link/4
for a list of available configuration options.
> brod:start_producer(my_client, <<"my_topic">>, [{max_retries, 5}]).
ok
-spec stop() -> ok.
-spec stop_client(client()) -> ok.
-spec subscribe(pid(), pid(), consumer_config()) -> ok | {error, any()}.
Subscribe to a data stream from the given consumer.
Seesubscribe/5
for more information.
-spec subscribe(client(), pid(), topic(), partition(), consumer_config()) -> {ok, pid()} | {error, any()}.
Subscribe to a data stream from the given topic-partition.
A client has to be already started (by calling start_client/3
, one client per multiple topics is enough) and a corresponding consumer for the topic and partition as well (by calling start_consumer/3
), before calling this function.
Caller may specify a set of options extending consumer config. See brod_consumer:subscribe/3
for more info on that.
If {error, Reason}
is returned, the caller should perhaps retry later.
{ok, ConsumerPid}
is returned on success. The caller may want to monitor the consumer pid and re-subscribe should the ConsumerPid
crash.
Upon successful subscription the subscriber process should expect messages of pattern: {ConsumerPid, #kafka_message_set{}}
and {ConsumerPid, #kafka_fetch_error{}}
.
-include_lib("brod/include/brod.hrl")
to access the records.
In case #kafka_fetch_error{}
is received the subscriber should re-subscribe itself to resume the data stream.
To provide a mechanism to handle backpressure, brod requires all messages sent to a subscriber to be acked by calling consume_ack/4
after they are processed. If there are too many not-acked messages received by the subscriber, the consumer will stop to fetch new ones so the subscriber won't get overwhelmed.
Equivalent to sync_produce_request(CallRef, infinity).
Block wait for sent produced request to be acked by kafka.
This way, you can turn asynchronous requests, made by produce/5
and friends, into synchronous ones.
{ok, CallRef} = brod:produce(
brod_client_1, <<"my_topic">>, 0, <<"some-key">>, <<"some-value">>)
). % returns immediately
% the following call waits and returns after the ack is received or timed out
brod:sync_produce_request(CallRef, 5_000).
Equivalent to sync_produce_request_offset(CallRef, infinity).
-spec sync_produce_request_offset(call_ref(), timeout()) -> {ok, offset()} | {error, Reason :: any()}.
As sync_produce_request/2
, but also returning assigned offset.
-spec transaction(client(), transactional_id(), transaction_config()) -> {ok, transaction()}.
Equivalent to brod_transaction:start_link / 3.
Start a new transaction,TxId
will be the id of the transaction
-spec txn_add_offsets(transaction(), group_id(), offsets_to_commit()) -> ok | {error, any()}.
See also: brod_transaction:add_offsets/3.
-spec txn_do(txn_function(), client(), txn_do_options()) -> {ok, pid()} | {error, any()}.
See also: brod_transaction_processor:do/3.
-spec txn_produce(transaction(), topic(), partition(), batch_input()) -> {ok, offset()} | {error, any()}.
See also: brod_transaction:produce/5.
-spec txn_produce(transaction(), topic(), partition(), key(), value()) -> {ok, offset()} | {error, any()}.
See also: brod_transaction:produce/5.
Unsubscribe the current subscriber.
Assuming the subscriber is %%self()
.
Unsubscribe the current subscriber.
Assuming the subscriber is %%self()
.