Behaviours: gen_server.
config() = proplists:proplist()
options() = brod:consumer_options()
partition() = brod:partition()
topic() = brod:topic()
| ack/2 | Subscriber confirms that a message (identified by offset) has been consumed, consumer process now may continue to fetch more messages. |
| debug/2 | Enable/disable debugging on the consumer process. |
| get_connection/1 | Get connection pid. |
| init/1 | |
| start_link/4 | Equivalent to start_link(ClientPid, Topic, Partition, Config, []). |
| start_link/5 | Start (link) a partition consumer. |
| stop/1 | |
| stop_maybe_kill/2 | |
| subscribe/3 | Subscribe or resubscribe on messages from a partition. |
| unsubscribe/2 | Unsubscribe the current subscriber. |
ack(Pid::pid(), Offset::brod:offset()) -> ok
Subscriber confirms that a message (identified by offset) has been consumed, consumer process now may continue to fetch more messages.
debug(Pid::pid(), File::print | string() | none) -> ok
Enable/disable debugging on the consumer process.
debug(Pid, print) prints debug info to stdout.
debug(Pid, File) prints debug info to a file File.
get_connection(Pid) -> any()
Get connection pid. Test/debug only.
init(X1) -> any()
start_link(Bootstrap::pid() | brod:bootstrap(), Topic::topic(), Partition::partition(), Config::config()) -> {ok, pid()} | {error, any()}
Equivalent to start_link(ClientPid, Topic, Partition, Config, []).
start_link(Bootstrap::pid() | brod:bootstrap(), Topic::topic(), Partition::partition(), Config::config(), Debug::[any()]) -> {ok, pid()} | {error, any()}
Start (link) a partition consumer.
Possible configs:min_bytes (optional, default = 0)
max_bytes (optional, default = 1MB)
Maximum bytes to fetch in a batch of messages.
NOTE: this value might be expanded to retry when it is not enough to fetch even a single message, then slowly shrinked back to the given value.max_wait_time (optional, default = 10000 ms)
min_bytes of messages in fetch responsesleep_timeout (optional, default = 1000 ms)
prefetch_count (optional, default = 10)
prefetch_bytes (optional, default = 100KB)
begin_offset (optional, default = latest)
offset_reset_policy (optional, default = reset_by_subscriber)
How to reset begin_offset if OffsetOutOfRange exception is received.
reset_by_subscriber: consumer is suspended
(is_suspended=true in state) and wait
for subscriber to re-subscribe with a new
begin_offset option.
reset_to_earliest: consume from the earliest offset.
reset_to_latest: consume from the last available offset.size_stat_window: (optional, default = 5)
max_bytes in
fetch requests after it has been expanded to fetch a large
message. Use 0 to immediately shrink back to original
max_bytes from config. A size estimation allows users to set
a relatively small max_bytes, then let it dynamically adjust
to a number around PrefetchCount * AverageSizestop(Pid::pid()) -> ok | {error, any()}
stop_maybe_kill(Pid::pid(), Timeout::timeout()) -> ok
subscribe(Pid::pid(), SubscriberPid::pid(), ConsumerOptions::options()) -> ok | {error, any()}
Subscribe or resubscribe on messages from a partition. Caller
may specify a set of options extending consumer config. It is
possible to update parameters such as max_bytes and
max_wait_time, or the starting point (begin_offset) of the data
stream.
Possible options:
All consumer configs as documented for start_link/5
begin_offset (optional, default = latest)
last_processed_offset + 1 as the begin_offset
to proceed. By default, it starts fetching from the latest
available offset.
unsubscribe(Pid::pid(), SubscriberPid::pid()) -> ok | {error, any()}
Unsubscribe the current subscriber.
Generated by EDoc