View Source brod (brod v4.0.0)

Summary

Types

A record with caller, callee, and ref.

Connection configuration that will be passed to kpro calls.

Consumer configuration.

fold always returns when reaches the high watermark offset. fold also returns when any of the limits is hit.
OffsetToContinue: begin offset for the next fold call
A record with offset, key, value, ts_type, ts, and headers.

A record with topic, partition, high_wm_offset (max offset of the partition), and messages.

Unix time in milliseconds
Physical offset (an integer)

A record with call_ref, base_offset, and result.

Functions

Abort the transaction

See also: brod_transaction:abort/1.

Commit the transaction

See also: brod_transaction:commit/1.

Connect to consumer group coordinator broker.

Acknowledge that one or more messages have been processed.

Fetch a single message set from the given topic-partition.

Fetch a single message set from the given topic-partition.

Fetch a single message set from the given topic-partition.
Same as {link fetch_committed_offsets/3}, but works with a started brod_client

Fetch committed offsets for ALL topics in the given consumer group.

Fetch broker metadata for all topics.

Fetch broker metadata for the given topics.

Fetch broker metadata for the given topics using the given connection options.

Get number of partitions for a given topic.

The same as get_partitions_count(Client, Topic) but ensured not to auto-create topics in Kafka even when Kafka has topic auto-creation configured.

List ALL consumer groups in the given kafka cluster.

List consumer groups in the given group coordinator broker.

Produce one or more messages.

Produce one or more messages.

Same as produce/3, only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka (see the produce_ack_cb() type).

Same as produce/5 only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka (see the produce_ack_cb() type).

Find the partition worker and send message without any ack.

Version of produce_sync/5 that returns the offset assigned by Kafka.

Resolve semantic offset or timestamp to real offset.

Resolve semantic offset or timestamp to real offset.

Start brod application.
Application behaviour callback

Dynamically start topic consumer(s) and register it in the client.

Start group_subscriber_v2.

Dynamically start a per-topic producer and register it in the client.

Stop brod application.
Application behaviour callback
Stop a client.

Subscribe to a data stream from the given consumer.

Subscribe to a data stream from the given topic-partition.

Block wait for sent produced request to be acked by kafka.

Equivalent to brod_transaction:start_link / 3.

Add the offset consumed by a group to the transaction.

See also: brod_transaction:add_offsets/3.

Execute the function in the context of a fetch-produce cycle with access to an open transaction.

See also: brod_transaction_processor:do/3.

Produce the batch of messages to the indicated topic-partition synchronously.

See also: brod_transaction:produce/5.

Produce the message (key and value) to the indicated topic-partition synchronously.

See also: brod_transaction:produce/5.

Unsubscribe the current subscriber.

Unsubscribe the current subscriber.

Unsubscribe the current subscriber.

Unsubscribe the current subscriber.

Types

-type batch_input() :: [msg_input()].
-type bootstrap() :: [endpoint()] | {[endpoint()], client_config()}.
-type call_ref() ::
          #brod_call_ref{caller :: undefined | pid(),
                         callee :: undefined | pid(),
                         ref :: undefined | reference()}.
A record with caller, callee, and ref.
-type cg() :: #brod_cg{id :: brod:group_id(), protocol_type :: brod:cg_protocol_type()}.
-type cg_protocol_type() :: binary().
-type client() :: client_id() | pid().
-type client_config() :: brod_client:config().
-type client_id() :: atom().
-type compression() :: no_compression | gzip | snappy.
-type conn_config() :: [{atom(), term()}] | kpro:conn_config().

Connection configuration that will be passed to kpro calls.

For more info, see the kpro_connection:config() type.
-type connection() :: kpro:connection().
-type consumer_config() ::
          [{begin_offset, offset_time()} |
           {min_bytes, non_neg_integer()} |
           {max_bytes, non_neg_integer()} |
           {max_wait_time, integer()} |
           {sleep_timeout, integer()} |
           {prefetch_count, integer()} |
           {prefetch_bytes, non_neg_integer()} |
           {offset_reset_policy, brod_consumer:offset_reset_policy()} |
           {size_stat_window, non_neg_integer()} |
           {isolation_level, brod_consumer:isolation_level()}].

Consumer configuration.

The meaning of the options is documented at brod_consumer:start_link/5.
-type endpoint() :: {hostname(), portnum()}.
-type error_code() :: kpro:error_code().
-type fetch_opts() :: kpro:fetch_opts().
-type fold_acc() :: term().
-type fold_fun(Acc) :: fun((message(), Acc) -> {ok, Acc} | {error, any()}).
fold always returns when reaches the high watermark offset. fold also returns when any of the limits is hit.
-type fold_limits() :: #{message_count => pos_integer(), reach_offset => offset()}.
-type fold_result() :: {fold_acc(), OffsetToContinue :: offset(), fold_stop_reason()}.
-type fold_stop_reason() ::
          reached_end_of_partition | reached_message_count_limit | reached_target_offset |
          {error, any()}.
OffsetToContinue: begin offset for the next fold call
-type group_config() :: proplists:proplist().
-type group_generation_id() :: non_neg_integer().
-type group_id() :: kpro:group_id().
-type group_member() ::
          {group_member_id(),
           #kafka_group_member_metadata{version :: non_neg_integer(),
                                        topics :: [brod:topic()],
                                        user_data :: binary()}}.
-type group_member_id() :: binary().
-type hostname() :: kpro:hostname().
-type key() :: undefined | binary().
-type message() :: kpro:message().
A record with offset, key, value, ts_type, ts, and headers.
-type message_set() ::
          #kafka_message_set{topic :: brod:topic(),
                             partition :: brod:partition(),
                             high_wm_offset :: integer(),
                             messages :: [brod:message()] | kpro:incomplete_batch()}.

A record with topic, partition, high_wm_offset (max offset of the partition), and messages.

See the definition for more information.
-type msg_input() :: kpro:msg_input().
-type msg_ts() :: kpro:msg_ts().
Unix time in milliseconds
-type offset() :: kpro:offset().
Physical offset (an integer)
-type offset_time() :: msg_ts() | earliest | latest.
-type offsets_to_commit() :: kpro:offsets_to_commit().
-type partition() :: kpro:partition().
Link to this type

partition_assignment/0

View Source
-type partition_assignment() :: {topic(), [partition()]}.
-type partition_fun() :: fun((topic(), pos_integer(), key(), value()) -> {ok, partition()}).
-type partitioner() :: partition_fun() | random | hash.
-type portnum() :: pos_integer().
-type produce_ack_cb() :: fun((partition(), offset()) -> _).
-type produce_reply() ::
          #brod_produce_reply{call_ref :: brod:call_ref(),
                              base_offset :: undefined | brod:offset(),
                              result :: brod:produce_result()}.

A record with call_ref, base_offset, and result.

See the the definition for more information.
-type produce_result() :: brod_produce_req_buffered | brod_produce_req_acked.
-type producer_config() :: brod_producer:config().
Link to this type

received_assignments/0

View Source
-type received_assignments() ::
          [#brod_received_assignment{topic :: brod:topic(),
                                     partition :: brod:partition(),
                                     begin_offset ::
                                         undefined | brod:offset() | {begin_offset, brod:offset_time()}}].
-type topic() :: kpro:topic().
-type topic_config() :: kpro:struct().
-type topic_partition() :: {topic(), partition()}.
-type transaction() :: brod_transaction:transaction().
-type transaction_config() :: brod_transaction:transaction_config().
-type transactional_id() :: brod_transaction:transactional_id().
-type txn_do_options() :: brod_transaction_processor:do_options().
-type txn_function() :: brod_transaction_processor:process_function().
-type value() ::
          undefined |
          iodata() |
          {msg_ts(), binary()} |
          [{key(), value()}] |
          [{msg_ts(), key(), value()}] |
          kpro:msg_input() |
          kpro:batch_input().

Functions

-spec abort(transaction()) -> ok | {error, any()}.
Abort the transaction

See also: brod_transaction:abort/1.

-spec commit(transaction()) -> ok | {error, any()}.
Commit the transaction

See also: brod_transaction:commit/1.

Link to this function

connect_group_coordinator(BootstrapEndpoints, ConnCfg, GroupId)

View Source
-spec connect_group_coordinator([endpoint()], conn_config(), group_id()) -> {ok, pid()} | {error, any()}.

Connect to consumer group coordinator broker.

Done in steps:
  1. Connect to any of the given bootstrap ednpoints
  2. Send group_coordinator_request to resolve group coordinator endpoint
  3. Connect to the resolved endpoint and return the connection pid
Link to this function

connect_leader(Hosts, Topic, Partition, ConnConfig)

View Source
-spec connect_leader([endpoint()], topic(), partition(), conn_config()) -> {ok, pid()}.
Connect partition leader.
Link to this function

consume_ack(ConsumerPid, Offset)

View Source
-spec consume_ack(pid(), offset()) -> ok | {error, any()}.

Equivalent to brod_consumer:ack(ConsumerPid, Offset).

See consume_ack/4 for more information.
Link to this function

consume_ack(Client, Topic, Partition, Offset)

View Source
-spec consume_ack(client(), topic(), partition(), offset()) -> ok | {error, any()}.

Acknowledge that one or more messages have been processed.

brod_consumer sends message-sets to the subscriber process, and keep the messages in a 'pending' queue. The subscriber may choose to ack any received offset. Acknowledging a greater offset will automatically acknowledge the messages before this offset. For example, if message [1, 2, 3, 4] have been sent to (as one or more message-sets) to the subscriber, the subscriber may acknowledge with offset 3 to indicate that the first three messages are successfully processed, leaving behind only message 4 pending.

The 'pending' queue has a size limit (see prefetch_count consumer config) which is to provide a mechanism to handle back-pressure. If there are too many messages pending on ack, the consumer will stop fetching new ones so the subscriber won't get overwhelmed.

Note, there is no range check done for the acknowledging offset, meaning if offset [M, N] are pending to be acknowledged, acknowledging with Offset > N will cause all offsets to be removed from the pending queue, and acknowledging with Offset < M has no effect.

Use this function only with plain partition subscribers (i.e., when you manually call subscribe/5). Behaviours like brod_topic_subscriber have their own way how to ack messages.
Link to this function

create_topics(Hosts, TopicConfigs, RequestConfigs)

View Source
-spec create_topics([endpoint()], [topic_config()], #{timeout => kpro:int32()}) -> ok | {error, any()}.

Equivalent to create_topics(Hosts, TopicsConfigs, RequestConfigs, []).

Link to this function

create_topics(Hosts, TopicConfigs, RequestConfigs, Options)

View Source
-spec create_topics([endpoint()], [topic_config()], #{timeout => kpro:int32()}, conn_config()) ->
                       ok | {error, any()}.

Create topic(s) in kafka.

TopicConfigs is a list of topic configurations. A topic configuration is a map (or tuple list for backward compatibility) with the following keys (all of them are reuired):
  • name

    The topic name.
  • num_partitions

    The number of partitions to create in the topic, or -1 if we are either specifying a manual partition assignment or using the default partitions.
  • replication_factor

    The number of replicas to create for each partition in the topic, or -1 if we are either specifying a manual partition assignment or using the default replication factor.
  • assignments

    The manual partition assignment, or the empty list if we let Kafka automatically assign them. It is a list of maps (or tuple lists) with the following keys: partition_index and broker_ids (a list of of brokers to place the partition on).
  • configs

    The custom topic configurations to set. It is a list of of maps (or tuple lists) with keys name and value. You can find possible options in the Kafka documentation.
Example:
  > TopicConfigs = [
      #{
        name => <<"my_topic">>,
        num_partitions => 1,
        replication_factor => 1,
        assignments => [],
        configs => [ #{name  => <<"cleanup.policy">>, value => "compact"}]
      }
    ].
  > brod:create_topics([{"localhost", 9092}], TopicConfigs, #{timeout => 1000}, []).
  ok
Link to this function

delete_topics(Hosts, Topics, Timeout)

View Source
-spec delete_topics([endpoint()], [topic()], pos_integer()) -> ok | {error, any()}.

Equivalent to delete_topics(Hosts, Topics, Timeout, []).

Link to this function

delete_topics(Hosts, Topics, Timeout, Options)

View Source
-spec delete_topics([endpoint()], [topic()], pos_integer(), conn_config()) -> ok | {error, any()}.

Delete topic(s) from kafka.

Example:
  > brod:delete_topics([{"localhost", 9092}], ["my_topic"], 5000, []).
  ok
Link to this function

describe_groups(CoordinatorEndpoint, ConnCfg, IDs)

View Source
-spec describe_groups(endpoint(), conn_config(), [group_id()]) -> {ok, [kpro:struct()]} | {error, any()}.

Describe consumer groups.

The given consumer group IDs should be all managed by the coordinator-broker running at the given endpoint. Otherwise error codes will be returned in the result structs. Return describe_groups response body field named groups. See kpro_schema.erl for struct details.
Link to this function

fetch(ConnOrBootstrap, Topic, Partition, Offset)

View Source
-spec fetch(connection() | client_id() | bootstrap(), topic(), partition(), integer()) ->
               {ok, {HwOffset :: offset(), [message()]}} | {error, any()}.

Fetch a single message set from the given topic-partition.

Calls fetch/5 with the default options: max_wait_time = 1 second, min_bytes = 1 B, and max_bytes = 2^20 B (1 MB).

See fetch/5 for more information.
Link to this function

fetch(ConnOrBootstrap, Topic, Partition, Offset, Opts)

View Source
-spec fetch(connection() | client_id() | bootstrap(), topic(), partition(), offset(), fetch_opts()) ->
               {ok, {HwOffset :: offset(), [message()]}} | {error, any()}.

Fetch a single message set from the given topic-partition.

The first arg can either be an already established connection to leader, or {Endpoints, ConnConfig} (or just Endpoints) so to establish a new connection before fetch.

The fourth argument is the start offset of the query. Messages with offset greater or equal will be fetched.

You can also pass options for the fetch query. See the kpro_req_lib:fetch_opts() type for their documentation. Only max_wait_time, min_bytes, max_bytes, and isolation_level options are currently supported. The defaults are the same as documented in the linked type, except for min_bytes which defaults to 1 in brod. Note that max_bytes will be rounded up so that full messages are retrieved. For example, if you specify max_bytes = 42 and there are three messages of size 40 bytes, two of them will be fetched.

On success, the function returns the messages along with the last stable offset (when using read_committed mode, the last committed offset) or the high watermark offset (offset of the last message that was successfully copied to all replicas, incremented by 1), whichever is lower. In essence, this is the offset up to which it was possible to read the messages at the time of fetching. This is similar to what resolve_offset/6 with latest returns. You can use this information to determine how far from the end of the topic you currently are. Note that when you use this offset as the start offset for a subseuqent call, an empty list of messages will be returned (assuming the topic hasn't changed, e.g. no new message arrived). Only when you use an offset greater than this one, {error, offset_out_of_range} will be returned.

Note also that Kafka batches messages in a message set only up to the end of a topic segment in which the first retrieved message is, so there may actually be more messages behind the last fetched offset even if the fetched size is significantly less than max_bytes provided in fetch_opts(). See this issue for more details.

Example (the topic has only two messages):
  > brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 0, #{max_bytes => 1024}).
  {ok,{2,
       [{kafka_message,0,<<"some_key">>,<<"Hello world!">>,
                       create,1663940976473,[]},
        {kafka_message,1,<<"another_key">>,<<"This is a message with offset 1.">>,
                       create,1663940996335,[]}]}}
 
  > brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 2, #{max_bytes => 1024}).
  {ok,{2,[]}}
 
  > brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 3, #{max_bytes => 1024}).
  {error,offset_out_of_range}
Link to this function

fetch(Hosts, Topic, Partition, Offset, MaxWaitTime, MinBytes, MaxBytes)

View Source
This function is deprecated. Please use fetch/5 instead.
-spec fetch([endpoint()],
            topic(),
            partition(),
            offset(),
            non_neg_integer(),
            non_neg_integer(),
            pos_integer()) ->
               {ok, [message()]} | {error, any()}.

Equivalent to fetch(Hosts, Topic, Partition, Offset, Wait, MinBytes, MaxBytes, []).

Link to this function

fetch(Hosts, Topic, Partition, Offset, MaxWaitTime, MinBytes, MaxBytes, ConnConfig)

View Source
This function is deprecated. Please use fetch/5 instead.
-spec fetch([endpoint()],
            topic(),
            partition(),
            offset(),
            non_neg_integer(),
            non_neg_integer(),
            pos_integer(),
            conn_config()) ->
               {ok, [message()]} | {error, any()}.
Fetch a single message set from the given topic-partition.
Link to this function

fetch_committed_offsets(Client, GroupId)

View Source
-spec fetch_committed_offsets(client(), group_id()) -> {ok, [kpro:struct()]} | {error, any()}.
Same as {link fetch_committed_offsets/3}, but works with a started brod_client
Link to this function

fetch_committed_offsets(BootstrapEndpoints, ConnCfg, GroupId)

View Source
-spec fetch_committed_offsets([endpoint()], conn_config(), group_id()) ->
                                 {ok, [kpro:struct()]} | {error, any()}.

Fetch committed offsets for ALL topics in the given consumer group.

Return the responses field of the offset_fetch response. See kpro_schema.erl for struct details.
Link to this function

fold(Bootstrap, Topic, Partition, Offset, Opts, Acc, Fun, Limits)

View Source
-spec fold(connection() | client_id() | bootstrap(),
           topic(),
           partition(),
           offset(),
           fetch_opts(),
           Acc,
           fold_fun(Acc),
           fold_limits()) ->
              fold_result()
              when Acc :: fold_acc().

Fold through messages in a partition.

Works like lists:foldl/2 but with below stop conditions:
  • Always return after reach high watermark offset
  • Return after the given message count limit is reached
  • Return after the given kafka offset is reached
  • Return if the FoldFun returns an {error, Reason} tuple
NOTE: Exceptions from evaluating FoldFun are not caught.
Link to this function

get_consumer(Client, Topic, Partition)

View Source
-spec get_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, Reason}
                      when
                          Reason ::
                              client_down |
                              {client_down, any()} |
                              {consumer_down, any()} |
                              {consumer_not_found, topic()} |
                              {consumer_not_found, topic(), partition()}.
-spec get_metadata([endpoint()]) -> {ok, kpro:struct()} | {error, any()}.

Fetch broker metadata for all topics.

See get_metadata/3 for more information.
Link to this function

get_metadata(Hosts, Topics)

View Source
-spec get_metadata([endpoint()], all | [topic()]) -> {ok, kpro:struct()} | {error, any()}.

Fetch broker metadata for the given topics.

See get_metadata/3 for more information.
Link to this function

get_metadata(Hosts, Topics, Options)

View Source
-spec get_metadata([endpoint()], all | [topic()], conn_config()) -> {ok, kpro:struct()} | {error, any()}.

Fetch broker metadata for the given topics using the given connection options.

The response differs in each version of the Metadata API call. The last supported Metadata API version is 2, so this will be probably used (if your Kafka supports it too). See kafka.bnf (search for MetadataResponseV2) for response schema with comments.

Beware that when auto.create.topics.enable is set to true in the broker configuration, fetching metadata with a concrete topic specified (in the Topics parameter) may cause creation of the topic when it does not exist. If you want a safe get_metadata call, always pass all as Topics and then filter them.

  > brod:get_metadata([{"localhost", 9092}], [<<"my_topic">>], []).
  {ok,#{brokers =>
            [#{host => <<"localhost">>,node_id => 1,port => 9092,
               rack => <<>>}],
        cluster_id => <<"jTb2faMLRf6p21yD1y3v-A">>,
        controller_id => 1,
        topics =>
            [#{error_code => no_error,is_internal => false,
               name => <<"my_topic">>,
               partitions =>
                   [#{error_code => no_error,
                      isr_nodes => [1],
                      leader_id => 1,partition_index => 1,
                      replica_nodes => [1]},
                    #{error_code => no_error,
                      isr_nodes => [1],
                      leader_id => 1,partition_index => 0,
                      replica_nodes => [1]}]}]}}
Link to this function

get_partitions_count(Client, Topic)

View Source
-spec get_partitions_count(client(), topic()) -> {ok, pos_integer()} | {error, any()}.

Get number of partitions for a given topic.

The higher level producers may need the partition numbers to find the partition producer pid – if the number of partitions is not statically configured for them. It is up to the callers how they want to distribute their data (e.g. random, roundrobin or consistent-hashing) to the partitions. NOTE: The partitions count is cached for 120 seconds.
Link to this function

get_partitions_count_safe(Client, Topic)

View Source
-spec get_partitions_count_safe(client(), topic()) -> {ok, pos_integer()} | {error, any()}.
The same as get_partitions_count(Client, Topic) but ensured not to auto-create topics in Kafka even when Kafka has topic auto-creation configured.
Link to this function

get_producer(Client, Topic, Partition)

View Source
-spec get_producer(client(), topic(), partition()) -> {ok, pid()} | {error, Reason}
                      when
                          Reason ::
                              client_down |
                              {client_down, any()} |
                              {producer_down, any()} |
                              {producer_not_found, topic()} |
                              {producer_not_found, topic(), partition()}.

Equivalent to brod_client:get_producer(Client, Topic, Partition).

Link to this function

list_all_groups(Endpoints, ConnCfg)

View Source
-spec list_all_groups([endpoint()], conn_config()) -> [{endpoint(), [cg()] | {error, any()}}].

List ALL consumer groups in the given kafka cluster.

NOTE: Exception if failed to connect any of the coordinator brokers.
Link to this function

list_groups(CoordinatorEndpoint, ConnCfg)

View Source
-spec list_groups(endpoint(), conn_config()) -> {ok, [cg()]} | {error, any()}.
List consumer groups in the given group coordinator broker.
-spec produce(pid(), value()) -> {ok, call_ref()} | {error, any()}.

Equivalent to produce(Pid, <<>>, Value).

Link to this function

produce(ProducerPid, Key, Value)

View Source
-spec produce(pid(), key(), value()) -> {ok, call_ref()} | {error, any()}.

Produce one or more messages.

See produce/5 for information about possible shapes of Value.

The pid should be a partition producer pid, NOT client pid.

The return value is a call reference of type call_ref(), so the caller can use it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} message after the produce request has been acked by Kafka.
Link to this function

produce(Client, Topic, Partition, Key, Value)

View Source
-spec produce(client(), topic(), partition() | partitioner(), key(), value()) ->
                 {ok, call_ref()} | {error, any()}.

Produce one or more messages.

Value can have many different forms:
  • binary(): Single message with key from the Key argument
  • {brod:msg_ts(), binary()}: Single message with its create-time timestamp and key from Key
  • #{ts => brod:msg_ts(), value => binary(), headers => [{_, _}]}: Single message; if this map does not have a key field, Key is used instead
  • [{K, V} | {T, K, V}]: A batch, where V could be a nested list of such representation
  • [#{key => K, value => V, ts => T, headers => [{_, _}]}]: A batch

When Value is a batch, the Key argument is only used as partitioner input and all messages are written on the same partition.

ts field is dropped for kafka prior to version 0.10 (produce API version 0, magic version 0). headers field is dropped for kafka prior to version 0.11 (produce API version 0-2, magic version 0-1).

Partition may be either a concrete partition (an integer) or a partitioner (see partitioner() for more info).

A producer for the particular topic has to be already started (by calling start_producer/3), unless you have specified auto_start_producers = true when starting the client.

This function first looks up the producer pid, then calls produce/3 to do the real work.

The return value is a call reference of type call_ref(), so the caller can used it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} (see the produce_reply() type) message after the produce request has been acked by Kafka.

Example:
  > brod:produce(my_client, <<"my_topic">>, 0, "key", <<"Hello from erlang!">>).
  {ok,{brod_call_ref,<0.83.0>,<0.133.0>,#Ref<0.3024768151.2556690436.92841>}}
  > flush().
  Shell got {brod_produce_reply,
                {brod_call_ref,<0.83.0>,<0.133.0>,
                    #Ref<0.3024768151.2556690436.92841>},
                12,brod_produce_req_acked}
Link to this function

produce_cb(ProducerPid, Key, Value, AckCb)

View Source
-spec produce_cb(pid(), key(), value(), produce_ack_cb()) -> ok | {error, any()}.
Same as produce/3, only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka (see the produce_ack_cb() type).
Link to this function

produce_cb(Client, Topic, Part, Key, Value, AckCb)

View Source
-spec produce_cb(client(), topic(), partition() | partitioner(), key(), value(), produce_ack_cb()) ->
                    ok | {ok, partition()} | {error, any()}.

Same as produce/5 only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka (see the produce_ack_cb() type).

Return the partition to caller as {ok, Partition} for caller to correlate the callback when the 3rd arg is not a partition number.
Link to this function

produce_no_ack(Client, Topic, Part, Key, Value)

View Source
-spec produce_no_ack(client(), topic(), partition() | partitioner(), key(), value()) ->
                        ok | {error, any()}.

Find the partition worker and send message without any ack.

NOTE: This call has no back-pressure to the caller, excessive usage may cause BEAM to run out of memory.
Link to this function

produce_sync(Pid, Value)

View Source
-spec produce_sync(pid(), value()) -> ok | {error, any()}.

Equivalent to produce_sync(Pid, <<>>, Value).

Link to this function

produce_sync(Pid, Key, Value)

View Source
-spec produce_sync(pid(), key(), value()) -> ok | {error, any()}.

Sync version of produce/3.

This function will not return until the response is received from Kafka. But when producer is started with required_acks set to 0, this function will return once the messages are buffered in the producer process.
Link to this function

produce_sync(Client, Topic, Partition, Key, Value)

View Source
-spec produce_sync(client(), topic(), partition() | partitioner(), key(), value()) ->
                      ok | {error, any()}.

Sync version of produce/5.

This function will not return until a response is received from kafka, however if producer is started with required_acks set to 0, this function will return once the messages are buffered in the producer process.
Link to this function

produce_sync_offset(Client, Topic, Partition, Key, Value)

View Source
-spec produce_sync_offset(client(), topic(), partition() | partitioner(), key(), value()) ->
                             {ok, offset()} | {error, any()}.

Version of produce_sync/5 that returns the offset assigned by Kafka.

If producer is started with required_acks set to 0, the offset will be ?BROD_PRODUCE_UNKNOWN_OFFSET.
Link to this function

resolve_offset(Hosts, Topic, Partition)

View Source
-spec resolve_offset([endpoint()], topic(), partition()) -> {ok, offset()} | {error, any()}.

Equivalent to resolve_offset(Hosts, Topic, Partition, latest, []).

Link to this function

resolve_offset(Hosts, Topic, Partition, Time)

View Source
-spec resolve_offset([endpoint()], topic(), partition(), offset_time()) ->
                        {ok, offset()} | {error, any()}.

Equivalent to resolve_offset(Hosts, Topic, Partition, Time, []).

Link to this function

resolve_offset(Hosts, Topic, Partition, Time, ConnCfg)

View Source
-spec resolve_offset([endpoint()], topic(), partition(), offset_time(), conn_config()) ->
                        {ok, offset()} | {error, any()}.

Resolve semantic offset or timestamp to real offset.

The same as resolve_offset/6 but the timeout is extracted from connection config.
Link to this function

resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, Opts)

View Source
-spec resolve_offset([endpoint()],
                     topic(),
                     partition(),
                     offset_time(),
                     conn_config(),
                     #{timeout => kpro:int32()}) ->
                        {ok, offset()} | {error, any()}.

Resolve semantic offset or timestamp to real offset.

The function returns the offset of the first message with the given timestamp, or of the first message after the given timestamp (in case no message matches the timestamp exactly), or -1 if the timestamp is newer than (>) all messages in the topic.

You can also use two semantic offsets instead of a timestamp: earliest gives you the offset of the first message in the topic and latest gives you the offset of the last message incremented by 1.

If the topic is empty, both earliest and latest return the same value (which is 0 unless some messages were deleted from the topic), and any timestamp returns -1.

An example for illustration:
  Messages:
  offset       0   1   2   3
  timestamp    10  20  20  30
 
  Calls:
  resolve_offset(Endpoints, Topic, Partition, 5)  0
  resolve_offset(Endpoints, Topic, Partition, 10)  0
  resolve_offset(Endpoints, Topic, Partition, 13)  1
  resolve_offset(Endpoints, Topic, Partition, 20)  1
  resolve_offset(Endpoints, Topic, Partition, 31)  -1
  resolve_offset(Endpoints, Topic, Partition, earliest)  0
  resolve_offset(Endpoints, Topic, Partition, latest)  4
-spec start() -> ok | no_return().
Start brod application.
Link to this function

start(StartType, StartArgs)

View Source
Application behaviour callback
Link to this function

start_client(BootstrapEndpoints)

View Source
-spec start_client([endpoint()]) -> ok | {error, any()}.

Equivalent to start_client(BootstrapEndpoints, brod_default_client).

Link to this function

start_client(BootstrapEndpoints, ClientId)

View Source
-spec start_client([endpoint()], client_id()) -> ok | {error, any()}.

Equivalent to start_client(BootstrapEndpoints, ClientId, []).

Link to this function

start_client(BootstrapEndpoints, ClientId, Config)

View Source
-spec start_client([endpoint()], client_id(), client_config()) -> ok | {error, any()}.

Start a client (brod_client).

BootstrapEndpoints: Kafka cluster endpoints, can be any of the brokers in the cluster, which does not necessarily have to be the leader of any partition, e.g. a load-balanced entrypoint to the remote Kafka cluster.

ClientId: Atom to identify the client process.

Config is a proplist, possible values:
  • restart_delay_seconds (optional, default=10)

    How long to wait between attempts to restart brod_client process when it crashes.
  • get_metadata_timeout_seconds (optional, default=5)

    Return {error, timeout} from brod_client:get_xxx calls if responses for APIs such as metadata, find_coordinator are not received in time.
  • reconnect_cool_down_seconds (optional, default=1)

    Delay this configured number of seconds before retrying to establish a new connection to the kafka partition leader.
  • allow_topic_auto_creation (optional, default=true)

    By default, brod respects what is configured in the broker about topic auto-creation. i.e. whether auto.create.topics.enable is set in the broker configuration. However if allow_topic_auto_creation is set to false in client config, brod will avoid sending metadata requests that may cause an auto-creation of the topic regardless of what broker config is.
  • auto_start_producers (optional, default=false)

    If true, brod client will spawn a producer automatically when user is trying to call produce but did not call brod:start_producer explicitly. Can be useful for applications which don't know beforehand which topics they will be working with.
  • default_producer_config (optional, default=[])

    Producer configuration to use when auto_start_producers is true. See brod_producer:start_link/4 for details about producer config

Connection options can be added to the same proplist. See kpro_connection.erl in kafka_protocol for the details:

  • ssl (optional, default=false)

    true | false | ssl:ssl_option()true is translated to [] as ssl:ssl_option() i.e. all default.
  • sasl (optional, default=undefined)

    Credentials for SASL/Plain authentication. {mechanism(), Filename} or {mechanism(), UserName, Password} where mechanism can be atoms: plain (for "PLAIN"), scram_sha_256 (for "SCRAM-SHA-256") or scram_sha_512 (for SCRAM-SHA-512). Filename should be a file consisting two lines, first line is the username and the second line is the password. Both Username and Password should be string() | binary()
  • connect_timeout (optional, default=5000)

    Timeout when trying to connect to an endpoint.
  • request_timeout (optional, default=240000, constraint: >= 1000)

    Timeout when waiting for a response, connection restart when timed out.
  • query_api_versions (optional, default=true)

    Must be set to false to work with kafka versions prior to 0.10, When set to true, at connection start, brod will send a query request to get the broker supported API version ranges. When set to 'false', brod will always use the lowest supported API version when sending requests to kafka. Supported API version ranges can be found in: brod_kafka_apis:supported_versions/1
  • extra_sock_opts (optional, default=[])

    Extra socket options to tune socket performance. e.g. [{sndbuf, 1 bsl 20}]. More info
You can read more about clients in the overview.
Link to this function

start_consumer(Client, TopicName, ConsumerConfig)

View Source
-spec start_consumer(client(), topic(), consumer_config()) -> ok | {error, any()}.

Dynamically start topic consumer(s) and register it in the client.

A brod_consumer is started for each partition of the given topic. Note that you can have only one consumer per client-topic.

See brod_consumer:start_link/5 for details about consumer config.

You can read more about consumers in the overview.
Link to this function

start_producer(Client, TopicName, ProducerConfig)

View Source
-spec start_producer(client(), topic(), producer_config()) -> ok | {error, any()}.

Dynamically start a per-topic producer and register it in the client.

You have to start a producer for each topic you want to produce messages into, unless you have specified auto_start_producers = true when starting the client (in that case you don't have to call this function at all).

After starting the producer, you can call produce/5 and friends for producing messages.

You can read more about producers in the overview.

A client has to be already started before making this call (e.g. by calling start_client/3).

See brod_producer:start_link/4 for a list of available configuration options.

Example:
  > brod:start_producer(my_client, <<"my_topic">>, [{max_retries, 5}]).
  ok
-spec stop() -> ok.
Stop brod application.
Application behaviour callback
-spec stop_client(client()) -> ok.
Stop a client.
Link to this function

subscribe(ConsumerPid, SubscriberPid, Options)

View Source
-spec subscribe(pid(), pid(), consumer_config()) -> ok | {error, any()}.

Subscribe to a data stream from the given consumer.

See subscribe/5 for more information.
Link to this function

subscribe(Client, SubscriberPid, Topic, Partition, Options)

View Source
-spec subscribe(client(), pid(), topic(), partition(), consumer_config()) ->
                   {ok, pid()} | {error, any()}.

Subscribe to a data stream from the given topic-partition.

A client has to be already started (by calling start_client/3, one client per multiple topics is enough) and a corresponding consumer for the topic and partition as well (by calling start_consumer/3), before calling this function.

Caller may specify a set of options extending consumer config. See brod_consumer:subscribe/3 for more info on that.

If {error, Reason} is returned, the caller should perhaps retry later.

{ok, ConsumerPid} is returned on success. The caller may want to monitor the consumer pid and re-subscribe should the ConsumerPid crash.

Upon successful subscription the subscriber process should expect messages of pattern: {ConsumerPid, #kafka_message_set{}} and {ConsumerPid, #kafka_fetch_error{}}.

-include_lib("brod/include/brod.hrl") to access the records.

In case #kafka_fetch_error{} is received the subscriber should re-subscribe itself to resume the data stream.

To provide a mechanism to handle backpressure, brod requires all messages sent to a subscriber to be acked by calling consume_ack/4 after they are processed. If there are too many not-acked messages received by the subscriber, the consumer will stop to fetch new ones so the subscriber won't get overwhelmed.

Only one process can be subscribed to a consumer. This means that if you want to read at different places (or at different paces), you have to create separate consumers (and thus also separate clients).
Link to this function

sync_produce_request(CallRef)

View Source
-spec sync_produce_request(call_ref()) -> ok | {error, Reason :: any()}.

Equivalent to sync_produce_request(CallRef, infinity).

Link to this function

sync_produce_request(CallRef, Timeout)

View Source
-spec sync_produce_request(call_ref(), timeout()) -> ok | {error, Reason :: any()}.

Block wait for sent produced request to be acked by kafka.

This way, you can turn asynchronous requests, made by produce/5 and friends, into synchronous ones.

Example:
  {ok, CallRef} = brod:produce(
    brod_client_1, <<"my_topic">>, 0, <<"some-key">>, <<"some-value">>)
  ). % returns immediately
  % the following call waits and returns after the ack is received or timed out
  brod:sync_produce_request(CallRef, 5_000).
Link to this function

sync_produce_request_offset(CallRef)

View Source
-spec sync_produce_request_offset(call_ref()) -> {ok, offset()} | {error, Reason :: any()}.

Equivalent to sync_produce_request_offset(CallRef, infinity).

Link to this function

sync_produce_request_offset(CallRef, Timeout)

View Source
-spec sync_produce_request_offset(call_ref(), timeout()) -> {ok, offset()} | {error, Reason :: any()}.

As sync_produce_request/2, but also returning assigned offset.

See {link produce_sync_offset/5}.
Link to this function

transaction(Client, TxnId, Config)

View Source
-spec transaction(client(), transactional_id(), transaction_config()) -> {ok, transaction()}.

Equivalent to brod_transaction:start_link / 3.

Start a new transaction, TxId will be the id of the transaction
Link to this function

txn_add_offsets(Transaction, ConsumerGroup, Offsets)

View Source
-spec txn_add_offsets(transaction(), group_id(), offsets_to_commit()) -> ok | {error, any()}.
Add the offset consumed by a group to the transaction.

See also: brod_transaction:add_offsets/3.

Link to this function

txn_do(ProcessFun, Client, Options)

View Source
-spec txn_do(txn_function(), client(), txn_do_options()) -> {ok, pid()} | {error, any()}.
Execute the function in the context of a fetch-produce cycle with access to an open transaction.

See also: brod_transaction_processor:do/3.

Link to this function

txn_produce(Transaction, Topic, Partition, Batch)

View Source
-spec txn_produce(transaction(), topic(), partition(), batch_input()) -> {ok, offset()} | {error, any()}.
Produce the batch of messages to the indicated topic-partition synchronously.

See also: brod_transaction:produce/5.

Link to this function

txn_produce(Transaction, Topic, Partition, Key, Value)

View Source
-spec txn_produce(transaction(), topic(), partition(), key(), value()) ->
                     {ok, offset()} | {error, any()}.
Produce the message (key and value) to the indicated topic-partition synchronously.

See also: brod_transaction:produce/5.

Link to this function

unsubscribe(ConsumerPid)

View Source
-spec unsubscribe(pid()) -> ok | {error, any()}.

Unsubscribe the current subscriber.

Assuming the subscriber is %% self().
Link to this function

unsubscribe(ConsumerPid, SubscriberPid)

View Source
-spec unsubscribe(pid(), pid()) -> ok | {error, any()}.
Unsubscribe the current subscriber.
Link to this function

unsubscribe(Client, Topic, Partition)

View Source
-spec unsubscribe(client(), topic(), partition()) -> ok | {error, any()}.

Unsubscribe the current subscriber.

Assuming the subscriber is %% self().
Link to this function

unsubscribe(Client, Topic, Partition, SubscriberPid)

View Source
-spec unsubscribe(client(), topic(), partition(), pid()) -> ok | {error, any()}.
Unsubscribe the current subscriber.