View Source brod_consumer (brod v3.16.3)
Link to this section Summary
Functions
Enable/disable debugging on the consumer process.
Equivalent to start_link(ClientPid, Topic, Partition, Config, []).
Start (link) a partition consumer.
Subscribe or resubscribe on messages from a partition. Caller may specify a set of options extending consumer config. It is possible to update parameters such as max_bytes
and max_wait_time
, or the starting point (begin_offset
) of the data stream.
Link to this section Types
-type bytes() :: non_neg_integer().
-type config() :: proplists:proplist().
-type isolation_level() :: kpro:isolation_level().
-type offset() :: brod:offset().
-type offset_reset_policy() :: reset_by_subscriber | reset_to_earliest | reset_to_latest.
-type offset_time() :: brod:offset_time().
-type options() :: brod:consumer_options().
-type partition() :: brod:partition().
-type pending_acks() :: #pending_acks{}.
-type pending_queue() :: queue:queue(pending()).
-type state() :: #state{}.
-type topic() :: brod:topic().
Link to this section Functions
-spec ack(pid(), brod:offset()) -> ok.
-spec debug(pid(), print | string() | none) -> ok.
Enable/disable debugging on the consumer process.
debug(Pid, print)
prints debug info to stdout.
debug(Pid, File)
prints debug info to a file File
.
-spec start_link(pid() | brod:bootstrap(), topic(), partition(), config()) -> {ok, pid()} | {error, any()}.
Equivalent to start_link(ClientPid, Topic, Partition, Config, []).
-spec start_link(pid() | brod:bootstrap(), topic(), partition(), config(), [any()]) -> {ok, pid()} | {error, any()}.
Start (link) a partition consumer.
Possible configs:
Minimal bytes to fetch in a batch of messagesmin_bytes
(optional, default = 0)max_bytes
(optional, default = 1MB)Maximum bytes to fetch in a batch of messages.
NOTE: this value might be expanded to retry when it is not enough to fetch even a single message, then slowly shrunk back to the given value.
Max number of seconds allowed for the broker to collectmax_wait_time
(optional, default = 10000 ms)min_bytes
of messages in fetch response
Allow consumer process to sleep this amount of ms if kafka replied 'empty' message set.sleep_timeout
(optional, default = 1000 ms)
The window size (number of messages) allowed to fetch-ahead.prefetch_count
(optional, default = 10)
The total number of bytes allowed to fetch-ahead. brod_consumer is greed, it only stops fetching more messages in when number of unacked messages has exceeded prefetch_count AND the unacked total volume has exceeded prefetch_bytesprefetch_bytes
(optional, default = 100KB)
The offset from which to begin fetch requests.begin_offset
(optional, default = latest)offset_reset_policy
(optional, default = reset_by_subscriber)How to reset
begin_offset
ifOffsetOutOfRange
exception is received.reset_by_subscriber
: consumer is suspended (is_suspended=true
in state) and wait for subscriber to re-subscribe with a newbegin_offset
option.reset_to_earliest
: consume from the earliest offset.reset_to_latest
: consume from the last available offset.
The moving-average window size to calculate average message size. Average message size is used to shrinksize_stat_window
: (optional, default = 5)max_bytes
in fetch requests after it has been expanded to fetch a large message. Use 0 to immediately shrink back to originalmax_bytes
from config. A size estimation allows users to set a relatively smallmax_bytes
, then let it dynamically adjust to a number aroundPrefetchCount * AverageSize
Level to control what transaction records are exposed to the consumer. Two values are allowed,isolation_level
: (optional, default =read_commited
)read_uncommitted
to retrieve all records, independently on the transaction outcome (if any), andread_committed
to get only the records from committed transactions
-spec stop(pid()) -> ok | {error, any()}.
-spec stop_maybe_kill(pid(), timeout()) -> ok.
-spec subscribe(pid(), pid(), options()) -> ok | {error, any()}.
Subscribe or resubscribe on messages from a partition. Caller may specify a set of options extending consumer config. It is possible to update parameters such as max_bytes
and max_wait_time
, or the starting point (begin_offset
) of the data stream.
Possible options:
All consumer configs as documented for start_link/5
begin_offset
(optional, default = latest)
last_processed_offset + 1
as the begin_offset
to proceed. By default, it starts fetching from the latest available offset.
-spec unsubscribe(pid(), pid()) -> ok | {error, any()}.