View Source brod_topic_subscriber behaviour (brod v4.0.0)

A topic subscriber is a gen_server which subscribes to all or a given set of partition consumers (pollers) of a given topic and calls the user-defined callback functions for message processing.

Callbacks are documented in the source code of this module.

Summary

Functions

Acknowledge that message has been successfully consumed.

Start (link) a topic subscriber which receives and processes the messages from a given partition set.

Start (link) a topic subscriber which receives and processes the messages or message sets from the given partition set. Use atom all to subscribe to all partitions. Messages are handled by calling CbModule:handle_message

Start (link) a topic subscriber which receives and processes the messages from the given partition set. Use atom all to subscribe to all partitions. Messages are handled by calling the callback function.

Stop topic subscriber.

Types

-type ack_ref() :: {brod:partition(), brod:offset()}.
-type cb_fun() :: fun((brod:partition(), brod:message() | brod:message_set(), cb_state()) -> cb_ret()).
-type cb_ret() :: {ok, cb_state()} | {ok, ack, cb_state()}.
-type cb_state() :: term().
-type committed_offsets() :: [{brod:partition(), brod:offset()}].
-type consumer() ::
          #consumer{partition :: brod:partition(),
                    consumer_pid :: undefined | pid() | {down, string(), any()},
                    consumer_mref :: undefined | reference(),
                    acked_offset :: undefined | brod:offset(),
                    last_offset :: undefined | brod:offset()}.
-type state() ::
          #state{client :: brod:client(),
                 client_mref :: reference(),
                 topic :: brod:topic(),
                 consumers :: [consumer()],
                 cb_module :: module(),
                 cb_state :: cb_state(),
                 message_type :: message | message_set}.
Link to this type

topic_subscriber_config/0

View Source
-type topic_subscriber_config() ::
          #{client := brod:client(),
            topic := brod:topic(),
            cb_module := module(),
            init_data => term(),
            message_type => message | message_set,
            consumer_config => brod:consumer_config(),
            partitions => all | [brod:partition()]}.

Callbacks

Link to this callback

handle_info/2

View Source (optional)
-callback handle_info(_Msg, cb_state()) -> {noreply, cb_state()}.
-callback handle_message(brod:partition(), brod:message() | brod:message_set(), cb_state()) -> cb_ret().
-callback init(brod:topic(), term()) -> {ok, committed_offsets(), cb_state()}.
-callback terminate(_Reason, cb_state()) -> _.

Functions

Link to this function

ack(Pid, Partition, Offset)

View Source
-spec ack(pid(), brod:partition(), brod:offset()) -> ok.
Acknowledge that message has been successfully consumed.
-spec start_link(topic_subscriber_config()) -> {ok, pid()} | {error, _}.

Start (link) a topic subscriber which receives and processes the messages from a given partition set.

Possible Config keys:

  • client: Client ID (or pid, but not recommended) of the brod client. Mandatory
  • topic: Topic to consume from. Mandatory
  • cb_module: Callback module which should have the callback functions implemented for message processing. Mandatory
  • consumer_config: For partition consumer, brod_topic_subscriber:start_link/6. Optional, defaults to []
  • message_type: The type of message that is going to be handled by the callback module. Can be either message or message set. Optional, defaults to message_set
  • init_data: The term() that is going to be passed to CbModule:init/2 when initializing the subscriber. Optional, defaults to undefined
  • partitions: List of partitions to consume from, or atom all. Optional, defaults to all.
Link to this function

start_link(Client, Topic, Partitions, ConsumerConfig, CbModule, CbInitArg)

View Source
This function is deprecated. Please use start_link/1 instead.
-spec start_link(brod:client(),
                 brod:topic(),
                 all | [brod:partition()],
                 brod:consumer_config(),
                 module(),
                 term()) ->
                    {ok, pid()} | {error, any()}.

Equivalent to start_link(Client, Topic, Partitions, ConsumerConfig, message, CbModule, CbInitArg).

Link to this function

start_link(Client, Topic, Partitions, ConsumerConfig, MessageType, CbModule, CbInitArg)

View Source
This function is deprecated. Please use start_link/1 instead.
-spec start_link(brod:client(),
                 brod:topic(),
                 all | [brod:partition()],
                 brod:consumer_config(),
                 message | message_set,
                 module(),
                 term()) ->
                    {ok, pid()} | {error, any()}.
Start (link) a topic subscriber which receives and processes the messages or message sets from the given partition set. Use atom all to subscribe to all partitions. Messages are handled by calling CbModule:handle_message
Link to this function

start_link(Client, Topic, Partitions, ConsumerConfig, CommittedOffsets, MessageType, CbFun, CbInitialState)

View Source
This function is deprecated. Please use start_link/1 instead.
-spec start_link(brod:client(),
                 brod:topic(),
                 all | [brod:partition()],
                 brod:consumer_config(),
                 committed_offsets(),
                 message | message_set,
                 cb_fun(),
                 cb_state()) ->
                    {ok, pid()} | {error, any()}.

Start (link) a topic subscriber which receives and processes the messages from the given partition set. Use atom all to subscribe to all partitions. Messages are handled by calling the callback function.

NOTE: CommittedOffsets are the offsets for the messages that have been successfully processed (acknowledged), not the begin-offset to start fetching from.
-spec stop(pid()) -> ok.
Stop topic subscriber.