View Source brod_topic_subscriber behaviour (brod v4.3.3)
A topic subscriber is a gen_server which subscribes to all or a given set of partition consumers (pollers) of a given topic and calls the user-defined callback functions for message processing.
Callbacks are documented in the source code of this module.Summary
Functions
Start (link) a topic subscriber which receives and processes the messages from a given partition set.
all
to subscribe to all partitions. Messages are handled by calling CbModule:handle_message
Start (link) a topic subscriber which receives and processes the messages from the given partition set. Use atom all
to subscribe to all partitions. Messages are handled by calling the callback function.
Types
-type ack_ref() :: {brod:partition(), brod:offset()}.
-type cb_fun() :: fun((brod:partition(), brod:message() | brod:message_set(), cb_state()) -> cb_ret()).
-type cb_state() :: term().
-type committed_offsets() :: [{brod:partition(), brod:offset()}].
-type consumer() :: #consumer{partition :: brod:partition(), consumer_pid :: undefined | pid() | {down, string(), any()}, consumer_mref :: undefined | reference(), acked_offset :: undefined | brod:offset(), last_offset :: undefined | brod:offset()}.
-type state() :: #state{client :: brod:client(), client_mref :: reference(), topic :: brod:topic(), consumers :: [consumer()], cb_module :: module(), cb_state :: cb_state(), message_type :: message | message_set}.
-type topic_subscriber_config() :: #{client := brod:client(), topic := brod:topic(), cb_module := module(), init_data => term(), message_type => message | message_set, consumer_config => brod:consumer_config(), partitions => all | [brod:partition()]}.
Callbacks
-callback handle_message(brod:partition(), brod:message() | brod:message_set(), cb_state()) -> cb_ret().
-callback init(brod:topic(), term()) -> {ok, committed_offsets(), cb_state()}.
-callback terminate(_Reason, cb_state()) -> _.
Functions
-spec ack(pid(), brod:partition(), brod:offset()) -> ok.
-spec start_link(topic_subscriber_config()) -> {ok, pid()} | {error, _}.
Start (link) a topic subscriber which receives and processes the messages from a given partition set.
Possible Config
keys:
client
: Client ID (or pid, but not recommended) of the brod client. Mandatorytopic
: Topic to consume from. Mandatorycb_module
: Callback module which should have the callback functions implemented for message processing. Mandatoryconsumer_config
: For partition consumer,brod_consumer:start_link/5
. Optional, defaults to[]
message_type
: The type of message that is going to be handled by the callback module. Can be either message or message set. Optional, defaults tomessage_set
init_data
: Theterm()
that is going to be passed toCbModule:init/2
when initializing the subscriber. Optional, defaults toundefined
partitions
: List of partitions to consume from, or atomall
. Optional, defaults toall
.
start_link(Client, Topic, Partitions, ConsumerConfig, CbModule, CbInitArg)
View Source-spec start_link(brod:client(), brod:topic(), all | [brod:partition()], brod:consumer_config(), module(), term()) -> {ok, pid()} | {error, any()}.
Equivalent to start_link(Client, Topic, Partitions, ConsumerConfig, message, CbModule, CbInitArg).
start_link(Client, Topic, Partitions, ConsumerConfig, MessageType, CbModule, CbInitArg)
View Source-spec start_link(brod:client(), brod:topic(), all | [brod:partition()], brod:consumer_config(), message | message_set, module(), term()) -> {ok, pid()} | {error, any()}.
all
to subscribe to all partitions. Messages are handled by calling CbModule:handle_message
start_link(Client, Topic, Partitions, ConsumerConfig, CommittedOffsets, MessageType, CbFun, CbInitialState)
View Source-spec start_link(brod:client(), brod:topic(), all | [brod:partition()], brod:consumer_config(), committed_offsets(), message | message_set, cb_fun(), cb_state()) -> {ok, pid()} | {error, any()}.
Start (link) a topic subscriber which receives and processes the messages from the given partition set. Use atom all
to subscribe to all partitions. Messages are handled by calling the callback function.
CommittedOffsets
are the offsets for the messages that have been successfully processed (acknowledged), not the begin-offset to start fetching from.
-spec stop(pid()) -> ok.