View Source brod_transaction (brod v4.0.0)
A brod_transaction
is a process that orchestates a set of producers to store messages within a transaction, it also supports committing offsets in the same transaction.
Simple produce sample:
{ok, Tx} = brod_transaction:new(Client, TxId, []),
lists:foreach(fun(Partition) ->
Key = rand(), Value = rand(),
{ok, _Offset} =
brod_transaction:produce(Tx,
Topic,
Partition,
Key,
Value),
end, Partitions),
brod_transaction:commit(Tx),
handle callback of a group subscriber using offset commit within a transaction:
handle_message(Topic,
Partition,
#kafka_message{ offset = Offset
, key = Key
, value = Value},
#{ client := Client
, group_id := GroupId} = State) ->
{ok, Tx} = brod_transaction:new(Client),
{ok, _ProducedOffset} = brod_transaction:produce(Tx, ?TOPIC_OUTPUT, Partition, Key, Value),
ok = brod_transaction:txn_add_offsets(Tx, GroupId, #{{Topic, Partition} => Offset}),
ok = brod_transaction:commit(Tx)
{ok, ack_no_commit, State}.
Summary
Functions
Abort the transaction, after this, the gen_server will stop
Add the offset consumed by a group to the transaction.
Commit the transaction, after this, the gen_server will stop
See also: start_link/3.
Synchronously produce the batch of messages to the indicated topic-partition
Produce the message (key and value) to the indicated topic-partition synchronously.
Start a new transaction,
TxId
will be the id of the transaction Config
is a proplist, all values are optional: timeout
:Connection timeout in millis `backoff_step
: after each retry it will sleep for 2^Attempt * backoff_step millis max_retries
Stop the transaction.
Types
-type batch_input() :: kpro:batch_input().
-type call_ref() :: brod:call_ref().
-type client_id() :: atom().
-type group_id() :: kpro:group_id().
-type key() :: brod:key().
-type offset() :: kpro:offset().
-type offsets_to_commit() :: kpro:offsets_to_commit().
-type partition() :: kpro:partition().
-type state() :: #state{client_pid :: client(), context :: txn_ctx(), timeout :: pos_integer(), sequences :: map(), sent_partitions :: map(), max_retries :: pos_integer(), backoff_step :: pos_integer()}.
-type topic() :: kpro:topic().
-type transaction() :: pid().
-type transaction_config() :: [{timeout, non_neg_integer()} | {backoff_step, non_neg_integer()} | {max_retries, non_neg_integer()}].
-type transactional_id() :: kpro:transactional_id().
-type txn_ctx() :: kpro:txn_ctx().
-type value() :: brod:value().
Functions
-spec abort(transaction()) -> ok | {error, any()}.
-spec add_offsets(transaction(), group_id(), offsets_to_commit()) -> ok | {error, any()}.
-spec commit(transaction()) -> ok | {error, any()}.
-spec new(pid(), transactional_id(), transaction_config()) -> {ok, transaction()}.
See also: start_link/3.
-spec produce(transaction(), topic(), partition(), batch_input()) -> {ok, offset()} | {error, any()}.
-spec start_link(pid(), transactional_id(), transaction_config()) -> {ok, pid()}.
TxId
will be the id of the transaction Config
is a proplist, all values are optional: timeout
:Connection timeout in millis `backoff_step
: after each retry it will sleep for 2^Attempt * backoff_step millis max_retries
-spec stop(transaction()) -> ok | {error, any()}.