View Source brod_transaction_processor (brod v4.3.3)
brod_transaction_processor
allows the execution of a function in the context of a transaction. It abstracts the usage of a group subscriber reading and writing using a transaction in each fetch cycle. For example, the following snippets are equivalent
-------------------------------------------------
function_that_does_something(Messages, ...) -> write_some_messages_into_some_topic(Messages, ...), write_some_other_messages_into_yet_another_topic(Messages, ...).
handle_message(Topic, Partition, Messages, State) -> {ok, Tx} = brod:transaction(...) % opens a transaction function_that_does_something(Messages, ...) % adds the writes to the transaction ok = brod:txn_add_offsets(...) % add offsets to the transsaction ok = btrod:commit(Tx) % commit {ok, ack_no_commit, State}
-------------------------------------------------
brod_transaction_processor:do( fun(Transaction, Messages) -> write_some_messages_into_some_topic(Messages, ...), write_some_other_messages_into_yet_another_topic(Messages, ...) end, ...)
-------------------------------------------------Summary
Functions
executes the ProcessFunction within the context of a transaction. Options is a map that can include group_config
as the configuration for the group suscriber. consumer_config
as the configuration for the consumer suscriber. transaction_config
transacction config. group_id
as the subscriber group id. topics
topics to fetch from.
Types
-type client_id() :: atom().
-type do_options() :: #{group_config => proplists:proplist(), consumer_config => proplists:proplist(), transaction_config => proplists:proplist(), group_id => binary(), topics => [binary()]}.
-type message_set() :: #kafka_message_set{topic :: brod:topic(), partition :: brod:partition(), high_wm_offset :: integer(), messages :: [brod:message()] | kpro:incomplete_batch()}.
-type process_function() :: fun((transaction(), message_set()) -> ok | {error, any()}).
-type transaction() :: brod_transaction:transaction().
Functions
-spec do(process_function(), client(), do_options()) -> {ok, pid()} | {error, any()}.
executes the ProcessFunction within the context of a transaction. Options is a map that can include group_config
as the configuration for the group suscriber. consumer_config
as the configuration for the consumer suscriber. transaction_config
transacction config. group_id
as the subscriber group id. topics
topics to fetch from.
FizzBuzz sample:
fizz_buzz(N) when (N rem 15) == 0 -> "FizzBuzz" fizz_buzz(N) when (N rem 3) == 0 -> "Fizz" fizz_buzz(N) when (N rem 5) == 0 -> "Buzz"; fizz_buzz(N) -> N end.
brod_transaction_processor:do( fun(Transaction, #kafka_message_set{ topic = _Topic , partition = Partition , messages = Messages} = _MessageSet) -> FizzBuzzed = lists:map(fun(#kafka_message{ key = Key , value = Value}) -> #{ key => Key , value => fizz_buzz(Value)} end, Messages),
brod:txn_produce(Transaction, ?OUTPUT_TOPIC, Partition, FizzBuzzed),
ok end, Client, #{ topics => [?INPUT_TOPIC] , group_id => ?PROCESSOR_GROUP_ID}).