View Source brod_transaction (brod v4.4.0)

A brod_transaction is a process that orchestates a set of producers to store messages within a transaction, it also supports committing offsets in the same transaction.

Simple produce sample:

   {ok, Tx} = brod_transaction:new(Client, TxId, []),
   lists:foreach(fun(Partition) ->
                     Key = rand(), Value = rand(),
                     {ok, _Offset} =
                     brod_transaction:produce(Tx,
                                              Topic,
                                              Partition,
                                              Key,
                                              Value),
                 end, Partitions),
   brod_transaction:commit(Tx),

handle callback of a group subscriber using offset commit within a transaction:

   handle_message(Topic,
                  Partition,
                  #kafka_message{ offset = Offset
                                , key = Key
                                , value = Value},
                  #{ client := Client
                   , group_id := GroupId} =  State) ->
     {ok, Tx} = brod_transaction:new(Client),
     {ok, _ProducedOffset} = brod_transaction:produce(Tx, ?TOPIC_OUTPUT, Partition, Key, Value),
     ok = brod_transaction:txn_add_offsets(Tx, GroupId, #{{Topic, Partition} => Offset}),
     ok = brod_transaction:commit(Tx)
 
     {ok, ack_no_commit, State}.

Summary

Functions

Abort the transaction, after this, the gen_server will stop

Add the offset consumed by a group to the transaction.

Commit the transaction, after this, the gen_server will stop

Synchronously produce the batch of messages to the indicated topic-partition

Produce the message (key and value) to the indicated topic-partition synchronously.

Start a new transaction, TxIdwill be the id of the transaction Config is a proplist, all values are optional: timeout:Connection timeout in millis `backoff_step: after each retry it will sleep for 2^Attempt * backoff_step millis max_retries

Stop the transaction.

Types

batch_input/0

-type batch_input() :: kpro:batch_input().

call_ref/0

-type call_ref() :: brod:call_ref().

client/0

-type client() :: client_id() | pid().

client_id/0

-type client_id() :: atom().

group_id/0

-type group_id() :: kpro:group_id().

key/0

-type key() :: brod:key().

offset/0

-type offset() :: kpro:offset().

offsets_to_commit/0

-type offsets_to_commit() :: kpro:offsets_to_commit().

partition/0

-type partition() :: kpro:partition().

state/0

-type state() ::
          #state{client_pid :: client(),
                 context :: txn_ctx(),
                 timeout :: pos_integer(),
                 sequences :: map(),
                 sent_partitions :: map(),
                 max_retries :: pos_integer(),
                 backoff_step :: pos_integer()}.

topic/0

-type topic() :: kpro:topic().

transaction/0

-type transaction() :: pid().

transaction_config/0

-type transaction_config() ::
          [{timeout, non_neg_integer()} |
           {backoff_step, non_neg_integer()} |
           {max_retries, non_neg_integer()}].

transactional_id/0

-type transactional_id() :: kpro:transactional_id().

txn_ctx/0

-type txn_ctx() :: kpro:txn_ctx().

value/0

-type value() :: brod:value().

Functions

abort(Transaction)

-spec abort(transaction()) -> ok | {error, any()}.

Abort the transaction, after this, the gen_server will stop

add_offsets(Transaction, ConsumerGroup, Offsets)

-spec add_offsets(transaction(), group_id(), offsets_to_commit()) -> ok | {error, any()}.

Add the offset consumed by a group to the transaction.

commit(Transaction)

-spec commit(transaction()) -> ok | {error, any()}.

Commit the transaction, after this, the gen_server will stop

handle_call(Call, From, State)

handle_cast(Cast, State)

init(_)

new(Client, TxId, Config)

-spec new(client(), transactional_id(), transaction_config()) -> {ok, transaction()}.

See also: start_link/3.

produce(Transaction, Topic, Partition, Batch)

-spec produce(transaction(), topic(), partition(), batch_input()) -> {ok, offset()} | {error, any()}.

Synchronously produce the batch of messages to the indicated topic-partition

produce(Transaction, Topic, Partition, Key, Value)

-spec produce(transaction(), topic(), partition(), key(), value()) -> {ok, offset()} | {error, any()}.

Produce the message (key and value) to the indicated topic-partition synchronously.

start_link(Client, TxId, Config)

-spec start_link(client(), transactional_id(), transaction_config()) -> {ok, pid()}.

Start a new transaction, TxIdwill be the id of the transaction Config is a proplist, all values are optional: timeout:Connection timeout in millis `backoff_step: after each retry it will sleep for 2^Attempt * backoff_step millis max_retries

stop(Transaction)

-spec stop(transaction()) -> ok | {error, any()}.

Stop the transaction.

terminate(Reason, State)