View Source brod_consumer (brod v4.3.2)
Kafka consumers work in poll mode. In brod, brod_consumer
is the poller, which is constantly asking for more data from the kafka node which is a leader for the given partition.
By subscribing to brod_consumer
a process should receive the polled message sets (not individual messages) into its mailbox. Shape of the message is documented at brod:subscribe/5
.
Messages processed by the subscriber has to be acked by calling ack/2
(or brod:consume_ack/4
) to notify the consumer that all messages before the acknowledged offsets are processed, hence more messages can be fetched and sent to the subscriber and the subscriber won't be overwhelmed by it.
Each consumer can have only one subscriber.
See the overview for some more information and examples.Summary
Functions
Enable/disable debugging on the consumer process.
Equivalent to start_link(ClientPid, Topic, Partition, Config, []).
Start (link) a partition consumer.
Subscribe or resubscribe on messages from a partition.
Types
-type bytes() :: non_neg_integer().
-type config() :: brod:consumer_config().
-type isolation_level() :: kpro:isolation_level().
-type offset() :: brod:offset().
-type offset_reset_policy() :: reset_by_subscriber | reset_to_earliest | reset_to_latest.
-type offset_time() :: brod:offset_time().
-type partition() :: brod:partition().
-type pending_acks() :: #pending_acks{count :: non_neg_integer(), bytes :: bytes(), queue :: pending_queue()}.
-type pending_queue() :: queue:queue(pending()).
-type state() :: #state{client_pid :: ignore | pid(), bootstrap :: ignore | get | brod:bootstrap(), connection :: undefined | pid(), topic :: binary(), partition :: integer(), begin_offset :: offset_time(), max_wait_time :: integer(), min_bytes :: bytes(), max_bytes_orig :: bytes(), sleep_timeout :: integer(), prefetch_count :: integer(), last_req_ref :: undefined | reference(), subscriber :: undefined | pid(), subscriber_mref :: undefined | reference(), pending_acks :: pending_acks(), is_suspended :: boolean(), offset_reset_policy :: offset_reset_policy(), avg_bytes :: number(), max_bytes :: bytes(), size_stat_window :: non_neg_integer(), prefetch_bytes :: non_neg_integer(), connection_mref :: undefined | reference(), isolation_level :: isolation_level()}.
-type topic() :: brod:topic().
Functions
-spec ack(pid(), brod:offset()) -> ok.
Enable/disable debugging on the consumer process.
debug(Pid, print)
prints debug info to stdout.
debug(Pid, File)
prints debug info to a file File
.
-spec start_link(pid() | brod:bootstrap(), topic(), partition(), config()) -> {ok, pid()} | {error, any()}.
Equivalent to start_link(ClientPid, Topic, Partition, Config, []).
-spec start_link(pid() | brod:bootstrap(), topic(), partition(), config(), [any()]) -> {ok, pid()} | {error, any()}.
Start (link) a partition consumer.
Possible configs:
Minimal bytes to fetch in a batch of messagesmin_bytes
(optional, default = 0)max_bytes
(optional, default = 1MB)Maximum bytes to fetch in a batch of messages.
NOTE: this value might be expanded to retry when it is not enough to fetch even a single message, then slowly shrunk back to the given value.
Max number of seconds allowed for the broker to collectmax_wait_time
(optional, default = 10000 ms)min_bytes
of messages in fetch response
Allow consumer process to sleep this amount of ms if kafka replied 'empty' message set.sleep_timeout
(optional, default = 1000 ms)
The window size (number of messages) allowed to fetch-ahead.prefetch_count
(optional, default = 10)
The total number of bytes allowed to fetch-ahead.prefetch_bytes
(optional, default = 100KB)brod_consumer
is greed, it only stops fetching more messages in when number of unacked messages has exceededprefetch_count
AND the unacked total volume has exceededprefetch_bytes
The offset from which to begin fetch requests. A subscriber may consume and process messages, then persist the associated offset to a persistent storage, then start (or restart) frombegin_offset
(optional, default = latest)last_processed_offset + 1
as thebegin_offset
to proceed. The offset has to already exist at the time of calling.offset_reset_policy
(optional, default = reset_by_subscriber)How to reset
begin_offset
ifOffsetOutOfRange
exception is received.reset_by_subscriber
: consumer is suspended (is_suspended=true
in state) and wait for subscriber to re-subscribe with a newbegin_offset
option.reset_to_earliest
: consume from the earliest offset.reset_to_latest
: consume from the last available offset.
The moving-average window size to calculate average message size. Average message size is used to shrinksize_stat_window
: (optional, default = 5)max_bytes
in fetch requests after it has been expanded to fetch a large message. Use 0 to immediately shrink back to originalmax_bytes
from config. A size estimation allows users to set a relatively smallmax_bytes
, then let it dynamically adjust to a number aroundPrefetchCount * AverageSize
Level to control what transaction records are exposed to the consumer. Two values are allowed,isolation_level
: (optional, default =read_committed
)read_uncommitted
to retrieve all records, independently on the transaction outcome (if any), andread_committed
to get only the records from committed transactions
Whether or not share the partition leader connection with other producers or consumers. Set toshare_leader_conn
: (optional, default =false
)true
to consume less TCP connections towards Kafka, but may lead to higher fetch latency. This is because Kafka can ony accumulate messages for the oldest fetch request, later requests behind it may get blocked untilmax_wait_time
expires for the oldest one
Subscribe or resubscribe on messages from a partition.
Caller may specify a set of options extending consumer config. It is possible to update parameters such as max_bytes
and max_wait_time
, or the starting point (begin_offset
) of the data stream. Note that you currently cannot update isolation_level
.
start_link/5
.