View Source brod_transaction_processor (brod v4.0.0)

brod_transaction_processor allows the execution of a function in the context of a transaction. It abstracts the usage of a group subscriber reading and writing using a transaction in each fetch cycle. For example, the following snippets are equivalent

-------------------------------------------------

function_that_does_something(Messages, ...) -> write_some_messages_into_some_topic(Messages, ...), write_some_other_messages_into_yet_another_topic(Messages, ...).

handle_message(Topic, Partition, Messages, State) -> {ok, Tx} = brod:transaction(...) % opens a transaction function_that_does_something(Messages, ...) % adds the writes to the transaction ok = brod:txn_add_offsets(...) % add offsets to the transsaction ok = btrod:commit(Tx) % commit {ok, ack_no_commit, State}

-------------------------------------------------

brod_transaction_processor:do( fun(Transaction, Messages) -> write_some_messages_into_some_topic(Messages, ...), write_some_other_messages_into_yet_another_topic(Messages, ...) end, ...)

-------------------------------------------------

Summary

Functions

executes the ProcessFunction within the context of a transaction. Options is a map that can include group_config as the configuration for the group suscriber. consumer_config as the configuration for the consumer suscriber. transaction_config transacction config. group_id as the subscriber group id. topics topics to fetch from.

Types

-type client() :: client_id() | pid().
-type client_id() :: atom().
-type do_options() ::
          #{group_config => proplists:proplist(),
            consumer_config => proplists:proplist(),
            transaction_config => proplists:proplist(),
            group_id => binary(),
            topics => [binary()]}.
-type message_set() ::
          #kafka_message_set{topic :: brod:topic(),
                             partition :: brod:partition(),
                             high_wm_offset :: integer(),
                             messages :: [brod:message()] | kpro:incomplete_batch()}.
-type process_function() :: fun((transaction(), message_set()) -> ok | {error, any()}).
-type transaction() :: brod_transaction:transaction().

Functions

Link to this function

do(ProcessFun, Client, Opts)

View Source
-spec do(process_function(), client(), do_options()) -> {ok, pid()} | {error, any()}.

executes the ProcessFunction within the context of a transaction. Options is a map that can include group_config as the configuration for the group suscriber. consumer_config as the configuration for the consumer suscriber. transaction_config transacction config. group_id as the subscriber group id. topics topics to fetch from.

FizzBuzz sample:

fizz_buzz(N) when (N rem 15) == 0 -> "FizzBuzz" fizz_buzz(N) when (N rem 3) == 0 -> "Fizz" fizz_buzz(N) when (N rem 5) == 0 -> "Buzz"; fizz_buzz(N) -> N end.

brod_transaction_processor:do( fun(Transaction, #kafka_message_set{ topic = _Topic , partition = Partition , messages = Messages} = _MessageSet) -> FizzBuzzed = lists:map(fun(#kafka_message{ key = Key , value = Value}) -> #{ key => Key , value => fizz_buzz(Value)} end, Messages),

brod:txn_produce(Transaction, ?OUTPUT_TOPIC, Partition, FizzBuzzed),

ok end, Client, #{ topics => [?INPUT_TOPIC] , group_id => ?PROCESSOR_GROUP_ID}).
Link to this function

get_committed_offsets(GroupId, TPs, State)

View Source
Link to this function

handle_message(Topic, Partition, Kafka_message_set, State)

View Source