Behaviours: gen_server.
config() = proplists:proplist()
options() = brod:consumer_options()
partition() = brod:partition()
topic() = brod:topic()
ack/2 | Subscriber confirms that a message (identified by offset) has been consumed, consumer process now may continue to fetch more messages. |
debug/2 | Enable/disable debugging on the consumer process. |
get_connection/1 | Get connection pid. |
init/1 | |
start_link/4 | Equivalent to start_link(ClientPid, Topic, Partition, Config, []). |
start_link/5 | Start (link) a partition consumer. |
stop/1 | |
stop_maybe_kill/2 | |
subscribe/3 | Subscribe or resubscribe on messages from a partition. |
unsubscribe/2 | Unsubscribe the current subscriber. |
ack(Pid::pid(), Offset::brod:offset()) -> ok
Subscriber confirms that a message (identified by offset) has been consumed, consumer process now may continue to fetch more messages.
debug(Pid::pid(), File::print | string() | none) -> ok
Enable/disable debugging on the consumer process.
debug(Pid, print)
prints debug info to stdout.
debug(Pid, File)
prints debug info to a file File
.
get_connection(Pid) -> any()
Get connection pid. Test/debug only.
init(X1) -> any()
start_link(Bootstrap::pid() | brod:bootstrap(), Topic::topic(), Partition::partition(), Config::config()) -> {ok, pid()} | {error, any()}
Equivalent to start_link(ClientPid, Topic, Partition, Config, []).
start_link(Bootstrap::pid() | brod:bootstrap(), Topic::topic(), Partition::partition(), Config::config(), Debug::[any()]) -> {ok, pid()} | {error, any()}
Start (link) a partition consumer.
Possible configs:min_bytes
(optional, default = 0)
max_bytes
(optional, default = 1MB)
Maximum bytes to fetch in a batch of messages.
NOTE: this value might be expanded to retry when it is not enough to fetch even a single message, then slowly shrinked back to the given value.max_wait_time
(optional, default = 10000 ms)
min_bytes
of messages in fetch responsesleep_timeout
(optional, default = 1000 ms)
prefetch_count
(optional, default = 10)
prefetch_bytes
(optional, default = 100KB)
begin_offset
(optional, default = latest)
offset_reset_policy
(optional, default = reset_by_subscriber)
How to reset begin_offset
if OffsetOutOfRange
exception is received.
reset_by_subscriber
: consumer is suspended
(is_suspended=true
in state) and wait
for subscriber to re-subscribe with a new
begin_offset
option.
reset_to_earliest
: consume from the earliest offset.
reset_to_latest
: consume from the last available offset.size_stat_window
: (optional, default = 5)
max_bytes
in
fetch requests after it has been expanded to fetch a large
message. Use 0 to immediately shrink back to original
max_bytes
from config. A size estimation allows users to set
a relatively small max_bytes
, then let it dynamically adjust
to a number around PrefetchCount * AverageSize
stop(Pid::pid()) -> ok | {error, any()}
stop_maybe_kill(Pid::pid(), Timeout::timeout()) -> ok
subscribe(Pid::pid(), SubscriberPid::pid(), ConsumerOptions::options()) -> ok | {error, any()}
Subscribe or resubscribe on messages from a partition. Caller
may specify a set of options extending consumer config. It is
possible to update parameters such as max_bytes
and
max_wait_time
, or the starting point (begin_offset
) of the data
stream.
Possible options:
All consumer configs as documented for start_link/5
begin_offset
(optional, default = latest)
last_processed_offset + 1
as the begin_offset
to proceed. By default, it starts fetching from the latest
available offset.
unsubscribe(Pid::pid(), SubscriberPid::pid()) -> ok | {error, any()}
Unsubscribe the current subscriber.
Generated by EDoc