View Source brod (brod v3.16.3)
Link to this section Summary
Types
fold always returns when reaches the high watermark offset. fold also returns when any of the limits is hit.Functions
Connect to consumer group coordinator broker.
Equivalent to create_topics(Hosts, TopicsConfigs, RequestConfigs, []).
create_topics, response. See kpro_schema.erl for struct detailsdelete_topics, response. See kpro_schema.erl for struct detailsdescribe_groups response body field named groups. See kpro_schema.erl for struct details{Endpoints, ConnConfig} so to establish a new connection before fetch.{Endpoints, ConnConfig} so to establish a new connection before fetch.fetch_committed_offsets/3, but works with a started brod_clientresponses field of the offset_fetch response. See kpro_schema.erl for struct details.lists:foldl/2 but with below stop conditions: * Always return after reach high watermark offset * Return after the given message count limit is reached * Return after the given kafka offset is reached. * Return if the FoldFun returns an {error, Reason} tuple. NOTE: Exceptions from evaluating FoldFun are not caught.metadata response. See kpro_schema.erl for detailsmetadata response. See kpro_schema.erl for struct detailsmetadata response. See kpro_schema.erl for struct detailsEquivalent to brod_client:get_producer / 3.
Equivalent to produce(Pid, <<>>, Value).
Value is a binary or an iolist. Otherwise send a batch, if Value is a (nested) key-value list, or a list of maps. In this case Key is discarded (only the keys in the key-value list are sent to Kafka). The pid should be a partition producer pid, NOT client pid. The return value is a call reference of type call_ref(), so the caller can use it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} message after the produce request has been acked by Kafka.Value is a binary or an iolist. Otherwise send a batch if Value is a (nested) key-value list, or a list of maps. In this case Key is used only for partitioning, or discarded if the 3rd argument is a partition number instead of a partitioner callback. This function first looks up the producer pid, then calls produce/3 to do the real work. The return value is a call reference of type call_ref(), so the caller can used it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} message after the produce request has been acked by Kafka.produce/3, only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka.produce/5 only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka. Return the partition to caller as {ok, Partition} for caller to correlate the callback when the 3rd arg is not a partition number.Equivalent to produce_sync(Pid, <<>>, Value).
Sync version of produce/3
?BROD_PRODUCE_UNKNOWN_OFFSET.Equivalent to resolve_offset(Hosts, Topic, Partition, latest, 1).
Equivalent to start_client(BootstrapEndpoints, brod_default_client).
Equivalent to start_client(BootstrapEndpoints, ClientId, []).
Start a client.
See also: for details about consumer config..
Equivalent to start_link_client(BootstrapEndpoints, brod_default_client).
Equivalent to start_link_client(BootstrapEndpoints, ClientId, []).
See also: brod_group_subscriber:start_link/7.
See also: brod_group_subscriber:start_link/8.
See also: brod_topic_subscriber:start_link/7.
See also: brod_producer:start_link/4.
Subscribe to a data stream from the given topic-partition.
self().self().Link to this section Types
-type batch_input() :: [msg_input()].
-type bootstrap() :: [endpoint()] | {[endpoint()], client_config()}.
-type call_ref() :: #brod_call_ref{}.
      -type cg() :: #brod_cg{}.
      -type cg_protocol_type() :: binary().
      -type client() :: client_id() | pid().
-type client_config() :: brod_client:config().
-type client_id() :: atom().
      -type compression() :: no_compression | gzip | snappy.
      -type conn_config() :: [{atom(), term()}] | kpro:conn_config().
-type connection() :: kpro:connection().
-type consumer_config() :: brod_consumer:config().
-type consumer_option() ::
    begin_offset | min_bytes | max_bytes | max_wait_time | sleep_timeout | prefetch_count |
    prefetch_bytes | offset_reset_policy | size_stat_window.
      -type consumer_options() :: [{consumer_option(), integer()}].
-type error_code() :: kpro:error_code().
-type fetch_opts() :: kpro:fetch_opts().
-type fold_acc() :: term().
      -type fold_fun(Acc) :: fun((message(), Acc) -> {ok, Acc} | {error, any()}).
fold always returns when reaches the high watermark offset. fold also returns when any of the limits is hit.
  -type fold_limits() :: #{message_count => pos_integer(), reach_offset => offset()}.
-type fold_result() :: {fold_acc(), OffsetToContinue :: offset(), fold_stop_reason()}.
-type fold_stop_reason() ::
    reached_end_of_partition | reached_message_count_limit | reached_target_offset |
    {error, any()}.
      -type group_config() :: proplists:proplist().
-type group_generation_id() :: non_neg_integer().
      -type group_id() :: kpro:group_id().
-type group_member() :: {group_member_id(), #kafka_group_member_metadata{}}.
-type group_member_id() :: binary().
      -type hostname() :: kpro:hostname().
-type key() :: undefined | binary().
      -type message() :: kpro:message().
-type message_set() :: #kafka_message_set{}.
      -type msg_input() :: kpro:msg_input().
-type msg_ts() :: kpro:msg_ts().
-type offset() :: kpro:offset().
-type offset_time() :: integer() | earliest | latest.
      -type partition() :: kpro:partition().
-type partitioner() :: partition_fun() | random | hash.
-type portnum() :: pos_integer().
      -type produce_reply() :: #brod_produce_reply{}.
      -type produce_result() :: brod_produce_req_buffered | brod_produce_req_acked.
      -type producer_config() :: brod_producer:config().
-type received_assignments() :: [#brod_received_assignment{}].
      -type topic() :: kpro:topic().
-type topic_config() :: kpro:struct().
-type value() :: undefined | iodata() | {msg_ts(), binary()} | [{key(), value()}] | [{msg_ts(), key(), value()}] | kpro:msg_input() | kpro:batch_input().
Link to this section Functions
-spec connect_group_coordinator([endpoint()], conn_config(), group_id()) -> {ok, pid()} | {error, any()}.
Connect to consumer group coordinator broker.
Done in steps:- Connect to any of the given bootstrap ednpoints
- Send group_coordinator_request to resolve group coordinator endpoint
- Connect to the resolved endpoint and return the connection pid
-spec connect_leader([endpoint()], topic(), partition(), conn_config()) -> {ok, pid()}.
-spec consume_ack(pid(), offset()) -> ok | {error, any()}.
-spec create_topics([endpoint()], [topic_config()], #{timeout => kpro:int32(), validate_only => boolean()}) -> ok | {ok, kpro:struct()} | {error, any()}.
Equivalent to create_topics(Hosts, TopicsConfigs, RequestConfigs, []).
-spec create_topics([endpoint()], [topic_config()], #{timeout => kpro:int32(), validate_only => boolean()}, conn_config()) -> ok | {ok, kpro:struct()} | {error, any()}.
create_topics, response. See kpro_schema.erl for struct details
  Equivalent to delete_topics(Hosts, Topics, Timeout, []).
-spec delete_topics([endpoint()], [topic()], pos_integer(), conn_config()) -> ok | {error, any()}.
delete_topics, response. See kpro_schema.erl for struct details
  -spec describe_groups(endpoint(), conn_config(), [group_id()]) -> {ok, [kpro:struct()]} | {error, any()}.
describe_groups response body field named groups. See kpro_schema.erl for struct details
  -spec fetch(connection() | client_id() | bootstrap(), topic(), partition(), integer()) -> {ok, {HwOffset :: offset(), [message()]}} | {error, any()}.
{Endpoints, ConnConfig} so to establish a new connection before fetch.
  -spec fetch(connection() | client_id() | bootstrap(), topic(), partition(), offset(), fetch_opts()) -> {ok, {HwOffset :: offset(), [message()]}} | {error, any()}.
{Endpoints, ConnConfig} so to establish a new connection before fetch.
  fetch(Hosts, Topic, Partition, Offset, MaxWaitTime, MinBytes, MaxBytes)
View Sourcefetch(Hosts, Topic, Partition, Offset, MaxWaitTime, MinBytes, MaxBytes, ConnConfig)
View Source-spec fetch_committed_offsets(client(), group_id()) -> {ok, [kpro:struct()]} | {error, any()}.
fetch_committed_offsets/3, but works with a started brod_client
  -spec fetch_committed_offsets([endpoint()], conn_config(), group_id()) -> {ok, [kpro:struct()]} | {error, any()}.
responses field of the offset_fetch response. See kpro_schema.erl for struct details.
  -spec fold(connection() | client_id() | bootstrap(), topic(), partition(), offset(), fetch_opts(), Acc, fold_fun(Acc), fold_limits()) -> fold_result() when Acc :: fold_acc().
lists:foldl/2 but with below stop conditions: * Always return after reach high watermark offset * Return after the given message count limit is reached * Return after the given kafka offset is reached. * Return if the FoldFun returns an {error, Reason} tuple. NOTE: Exceptions from evaluating FoldFun are not caught.
  -spec get_metadata([endpoint()]) -> {ok, kpro:struct()} | {error, any()}.
metadata response. See kpro_schema.erl for details
  -spec get_metadata([endpoint()], all | [topic()]) -> {ok, kpro:struct()} | {error, any()}.
metadata response. See kpro_schema.erl for struct details
  -spec get_metadata([endpoint()], all | [topic()], conn_config()) -> {ok, kpro:struct()} | {error, any()}.
metadata response. See kpro_schema.erl for struct details
  -spec get_producer(client(), topic(), partition()) -> {ok, pid()} | {error, Reason} when Reason :: client_down | {client_down, any()} | {producer_down, any()} | {producer_not_found, topic()} | {producer_not_found, topic(), partition()}.
Equivalent to brod_client:get_producer / 3.
-spec list_all_groups([endpoint()], conn_config()) -> [{endpoint(), [cg()] | {error, any()}}].
-spec list_groups(endpoint(), conn_config()) -> {ok, [cg()]} | {error, any()}.
Equivalent to produce(Pid, <<>>, Value).
Value is a binary or an iolist. Otherwise send a batch, if Value is a (nested) key-value list, or a list of maps. In this case Key is discarded (only the keys in the key-value list are sent to Kafka). The pid should be a partition producer pid, NOT client pid. The return value is a call reference of type call_ref(), so the caller can use it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} message after the produce request has been acked by Kafka.
  -spec produce(client(), topic(), partition() | partitioner(), key(), value()) -> {ok, call_ref()} | {error, any()}.
Value is a binary or an iolist. Otherwise send a batch if Value is a (nested) key-value list, or a list of maps. In this case Key is used only for partitioning, or discarded if the 3rd argument is a partition number instead of a partitioner callback. This function first looks up the producer pid, then calls produce/3 to do the real work. The return value is a call reference of type call_ref(), so the caller can used it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} message after the produce request has been acked by Kafka.
  -spec produce_cb(pid(), key(), value(), produce_ack_cb()) -> ok | {error, any()}.
produce/3, only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka.
  -spec produce_cb(client(), topic(), partition() | partitioner(), key(), value(), produce_ack_cb()) -> ok | {ok, partition()} | {error, any()}.
produce/5 only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka. Return the partition to caller as {ok, Partition} for caller to correlate the callback when the 3rd arg is not a partition number.
  -spec produce_sync(pid(), value()) -> ok.
Equivalent to produce_sync(Pid, <<>>, Value).
Same asproduce/5 only the ack is not d
  Sync version of produce/3
This function will not return until the response is received from Kafka. But when producer is started withrequired_acks set to 0, this function will return once the messages are buffered in the producer process.
  -spec produce_sync_offset(client(), topic(), partition() | partitioner(), key(), value()) -> {ok, offset()} | {error, any()}.
?BROD_PRODUCE_UNKNOWN_OFFSET.
  Equivalent to resolve_offset(Hosts, Topic, Partition, latest, 1).
-spec resolve_offset([endpoint()], topic(), partition(), offset_time()) -> {ok, offset()} | {error, any()}.
-spec resolve_offset([endpoint()], topic(), partition(), offset_time(), conn_config()) -> {ok, offset()} | {error, any()}.
-spec resolve_offset([endpoint()], topic(), partition(), offset_time(), conn_config(), #{timeout => kpro:int32()}) -> {ok, offset()} | {error, any()}.
-spec start() -> ok | no_return().
      -spec start_client([endpoint()]) -> ok | {error, any()}.
Equivalent to start_client(BootstrapEndpoints, brod_default_client).
Equivalent to start_client(BootstrapEndpoints, ClientId, []).
-spec start_client([endpoint()], client_id(), client_config()) -> ok | {error, any()}.
Start a client.
BootstrapEndpoints: Kafka cluster endpoints, can be any of the brokers in the cluster, which does not necessarily have to be the leader of any partition, e.g. a load-balanced entrypoint to the remote Kafka cluster.
ClientId: Atom to identify the client process.
Config is a proplist, possible values:
 How long to wait between attempts to restart brod_client process when it crashes.- restart_delay_seconds(optional, default=10)
 Return- get_metadata_timeout_seconds(optional, default=5)- {error, timeout}from- brod_client:get_xxxcalls if responses for APIs such as- metadata,- find_coordinatorare not received in time.
 Delay this configured number of seconds before retrying to establish a new connection to the kafka partition leader.- reconnect_cool_down_seconds(optional, default=1)
 By default, brod respects what is configured in the broker about topic auto-creation. i.e. whether- allow_topic_auto_creation(optional, default=true)- auto.create.topics.enableis set in the broker configuration. However if- allow_topic_auto_creationis set to- falsein client config, brod will avoid sending metadata requests that may cause an auto-creation of the topic regardless of what broker config is.
 If true, brod client will spawn a producer automatically when user is trying to call- auto_start_producers(optional, default=false)- producebut did not call- brod:start_producerexplicitly. Can be useful for applications which don't know beforehand which topics they will be working with.
 Producer configuration to use when auto_start_producers is true. See- default_producer_config(optional, default=[])- brod_producer:start_link/4for details about producer config
Connection options can be added to the same proplist. See kpro_connection.erl in kafka_protocol for the details:
- ssl(optional, default=false)- true | false | ssl:ssl_option()- trueis translated to- []as- ssl:ssl_option()i.e. all default.
 Credentials for SASL/Plain authentication.- sasl(optional, default=undefined)- {mechanism(), Filename}or- {mechanism(), UserName, Password}where mechanism can be atoms:- plain(for "PLAIN"),- scram_sha_256(for "SCRAM-SHA-256") or- scram_sha_512(for SCRAM-SHA-512).- Filenameshould be a file consisting two lines, first line is the username and the second line is the password. Both- Usernameand- Passwordshould be- string() | binary()
 Timeout when trying to connect to an endpoint.- connect_timeout(optional, default=5000)
 Timeout when waiting for a response, connection restart when timed out.- request_timeout(optional, default=240000, constraint: >= 1000)
 Must be set to false to work with kafka versions prior to 0.10, When set to- query_api_versions(optional, default=true)- true, at connection start, brod will send a query request to get the broker supported API version ranges. When set to 'false', brod will always use the lowest supported API version when sending requests to kafka. Supported API version ranges can be found in:- brod_kafka_apis:supported_versions/1
 Extra socket options to tune socket performance. e.g.- extra_sock_opts(optional, default=[])- [{sndbuf, 1 bsl 20}]. More info
-spec start_consumer(client(), topic(), consumer_config()) -> ok | {error, any()}.
See also: for details about consumer config..
-spec start_link_client([endpoint()]) -> {ok, pid()} | {error, any()}.
Equivalent to start_link_client(BootstrapEndpoints, brod_default_client).
Equivalent to start_link_client(BootstrapEndpoints, ClientId, []).
-spec start_link_client([endpoint()], client_id(), client_config()) -> {ok, pid()} | {error, any()}.
start_link_group_subscriber(Client, GroupId, Topics, GroupConfig, ConsumerConfig, CbModule, CbInitArg)
View Source-spec start_link_group_subscriber(client(), group_id(), [topic()], group_config(), consumer_config(), module(), term()) -> {ok, pid()} | {error, any()}.
See also: brod_group_subscriber:start_link/7.
start_link_group_subscriber(Client, GroupId, Topics, GroupConfig, ConsumerConfig, MessageType, CbModule, CbInitArg)
View Source-spec start_link_group_subscriber(client(), group_id(), [topic()], group_config(), consumer_config(), message | message_set, module(), term()) -> {ok, pid()} | {error, any()}.
See also: brod_group_subscriber:start_link/8.
-spec start_link_group_subscriber_v2(brod_group_subscriber_v2:subscriber_config()) -> {ok, pid()} | {error, any()}.
-spec start_link_topic_subscriber(brod_topic_subscriber:topic_subscriber_config()) -> {ok, pid()} | {error, any()}.
See also: brod_topic_subscriber:start_link/1.
start_link_topic_subscriber(Client, Topic, ConsumerConfig, CbModule, CbInitArg)
View Source-spec start_link_topic_subscriber(client(), topic(), consumer_config(), module(), term()) -> {ok, pid()} | {error, any()}.
Equivalent to start_link_topic_subscriber(Client, Topic, all, ConsumerConfig, CbModule, CbInitArg).
start_link_topic_subscriber(Client, Topic, Partitions, ConsumerConfig, CbModule, CbInitArg)
View Source-spec start_link_topic_subscriber(client(), topic(), all | [partition()], consumer_config(), module(), term()) -> {ok, pid()} | {error, any()}.
Equivalent to start_link_topic_subscriber(Client, Topic, Partitions, ConsumerConfig, message, CbModule, CbInitArg).
start_link_topic_subscriber(Client, Topic, Partitions, ConsumerConfig, MessageType, CbModule, CbInitArg)
View Source-spec start_link_topic_subscriber(client(), topic(), all | [partition()], consumer_config(), message | message_set, module(), term()) -> {ok, pid()} | {error, any()}.
See also: brod_topic_subscriber:start_link/7.
-spec start_producer(client(), topic(), producer_config()) -> ok | {error, any()}.
See also: brod_producer:start_link/4.
-spec stop() -> ok.
      -spec stop_client(client()) -> ok.
-spec subscribe(pid(), pid(), consumer_options()) -> ok | {error, any()}.
-spec subscribe(client(), pid(), topic(), partition(), consumer_options()) -> {ok, pid()} | {error, any()}.
Subscribe to a data stream from the given topic-partition.
If {error, Reason} is returned, the caller should perhaps retry later.
{ok, ConsumerPid} is returned on success. The caller may want to monitor the consumer pid and re-subscribe should the ConsumerPid crash.
Upon successful subscription the subscriber process should expect messages of pattern: {ConsumerPid, #kafka_message_set{}} and {ConsumerPid, #kafka_fetch_error{}}.
-include_lib("brod/include/brod.hrl") to access the records.
#kafka_fetch_error{} is received the subscriber should re-subscribe itself to resume the data stream.
  -spec sync_produce_request(call_ref()) -> ok | {error, Reason :: any()}.
-spec sync_produce_request(call_ref(), timeout()) -> ok | {error, Reason :: any()}.
-spec unsubscribe(pid()) -> ok | {error, any()}.
      self().
  -spec unsubscribe(pid(), pid()) -> ok | {error, any()}.
      self().