View Source brod_transaction (brod v3.18.0)

A brod_transaction is a process that orchestates a set of producers to store messages within a transaction, it also supports committing offsets in the same transaction.

Simple produce sample:

   {ok, Tx} = brod_transaction:new(Client, TxId, []),
   lists:foreach(fun(Partition) ->
                     Key = rand(), Value = rand(),
                     {ok, _Offset} =
                     brod_transaction:produce(Tx,
                                              Topic,
                                              Partition,
                                              Key,
                                              Value),
                 end, Partitions),
   brod_transaction:commit(Tx),

handle callback of a group subscriber using offset commit within a transaction:

   handle_message(Topic,
                  Partition,
                  #kafka_message{ offset = Offset
                                , key = Key
                                , value = Value},
                  #{ client := Client
                   , group_id := GroupId} =  State) ->
     {ok, Tx} = brod_transaction:new(Client),
     {ok, _ProducedOffset} = brod_transaction:produce(Tx, ?TOPIC_OUTPUT, Partition, Key, Value),
     ok = brod_transaction:txn_add_offsets(Tx, GroupId, #{{Topic, Partition} => Offset}),
     ok = brod_transaction:commit(Tx)
 
     {ok, ack_no_commit, State}.

Link to this section Summary

Functions

Abort the transaction, after this, the gen_server will stop
Add the offset consumed by a group to the transaction.
Commit the transaction, after this, the gen_server will stop
Synchronously produce the batch of messages to the indicated topic-partition
Produce the message (key and value) to the indicated topic-partition synchronously.
Start a new transaction, TxIdwill be the id of the transaction Config is a proplist, all values are optional: timeout:Connection timeout in millis `backoff_step: after each retry it will sleep for 2^Attempt * backoff_step millis max_retries
Stop the transaction.

Link to this section Types

-type batch_input() :: kpro:batch_input().
-type call_ref() :: brod:call_ref().
-type client() :: client_id() | pid().
-type client_id() :: atom().
-type group_id() :: kpro:group_id().
-type key() :: brod:key().
-type offset() :: kpro:offset().
-type offsets_to_commit() :: kpro:offsets_to_commit().
-type partition() :: kpro:partition().
-type state() :: #state{}.
-type topic() :: kpro:topic().
-type transaction() :: pid().
-type transaction_config() ::
    [{timeout, non_neg_integer()} |
     {backoff_step, non_neg_integer()} |
     {max_retries, non_neg_integer()}].
-type transactional_id() :: kpro:transactional_id().
-type txn_ctx() :: kpro:txn_ctx().
-type value() :: brod:value().

Link to this section Functions

-spec abort(transaction()) -> ok | {error, any()}.
Abort the transaction, after this, the gen_server will stop
Link to this function

add_offsets(Transaction, ConsumerGroup, Offsets)

View Source
-spec add_offsets(transaction(), group_id(), offsets_to_commit()) -> ok | {error, any()}.
Add the offset consumed by a group to the transaction.
-spec commit(transaction()) -> ok | {error, any()}.
Commit the transaction, after this, the gen_server will stop
Link to this function

handle_call(Call, From, State)

View Source
Link to this function

handle_cast(Cast, State)

View Source
Link to this function

new(ClientPid, TxId, Config)

View Source
-spec new(pid(), transactional_id(), transaction_config()) -> {ok, transaction()}.

See also: start_link/3.

Link to this function

produce(Transaction, Topic, Partition, Batch)

View Source
-spec produce(transaction(), topic(), partition(), batch_input()) -> {ok, offset()} | {error, any()}.
Synchronously produce the batch of messages to the indicated topic-partition
Link to this function

produce(Transaction, Topic, Partition, Key, Value)

View Source
-spec produce(transaction(), topic(), partition(), key(), value()) -> {ok, offset()} | {error, any()}.
Produce the message (key and value) to the indicated topic-partition synchronously.
Link to this function

start_link(ClientPid, TxId, Config)

View Source
-spec start_link(pid(), transactional_id(), transaction_config()) -> {ok, pid()}.
Start a new transaction, TxIdwill be the id of the transaction Config is a proplist, all values are optional: timeout:Connection timeout in millis `backoff_step: after each retry it will sleep for 2^Attempt * backoff_step millis max_retries
-spec stop(transaction()) -> ok | {error, any()}.
Stop the transaction.
Link to this function

terminate(Reason, State)

View Source