Module brod_consumer

Behaviours: gen_server.

Data Types

config()

config() = proplists:proplist()

options()

options() = brod:consumer_options()

partition()

partition() = brod:partition()

topic()

topic() = brod:topic()

Function Index

ack/2Subscriber confirms that a message (identified by offset) has been consumed, consumer process now may continue to fetch more messages.
debug/2Enable/disable debugging on the consumer process.
get_connection/1Get connection pid.
init/1
start_link/4Equivalent to start_link(ClientPid, Topic, Partition, Config, []).
start_link/5Start (link) a partition consumer.
stop/1
stop_maybe_kill/2
subscribe/3Subscribe or resubscribe on messages from a partition.
unsubscribe/2Unsubscribe the current subscriber.

Function Details

ack/2

ack(Pid::pid(), Offset::brod:offset()) -> ok

Subscriber confirms that a message (identified by offset) has been consumed, consumer process now may continue to fetch more messages.

debug/2

debug(Pid::pid(), File::print | string() | none) -> ok

Enable/disable debugging on the consumer process.

debug(Pid, print) prints debug info to stdout.

debug(Pid, File) prints debug info to a file File.

get_connection/1

get_connection(Pid) -> any()

Get connection pid. Test/debug only.

init/1

init(X1) -> any()

start_link/4

start_link(Bootstrap::pid() | brod:bootstrap(), Topic::topic(), Partition::partition(), Config::config()) -> {ok, pid()} | {error, any()}

Equivalent to start_link(ClientPid, Topic, Partition, Config, []).

start_link/5

start_link(Bootstrap::pid() | brod:bootstrap(), Topic::topic(), Partition::partition(), Config::config(), Debug::[any()]) -> {ok, pid()} | {error, any()}

Start (link) a partition consumer.

Possible configs:

stop/1

stop(Pid::pid()) -> ok | {error, any()}

stop_maybe_kill/2

stop_maybe_kill(Pid::pid(), Timeout::timeout()) -> ok

subscribe/3

subscribe(Pid::pid(), SubscriberPid::pid(), ConsumerOptions::options()) -> ok | {error, any()}

Subscribe or resubscribe on messages from a partition. Caller may specify a set of options extending consumer config. It is possible to update parameters such as max_bytes and max_wait_time, or the starting point (begin_offset) of the data stream.

Possible options:

All consumer configs as documented for start_link/5

begin_offset (optional, default = latest)

A subscriber may consume and process messages, then persist the associated offset to a persistent storage, then start (or restart) from last_processed_offset + 1 as the begin_offset to proceed. By default, it starts fetching from the latest available offset.

unsubscribe/2

unsubscribe(Pid::pid(), SubscriberPid::pid()) -> ok | {error, any()}

Unsubscribe the current subscriber.


Generated by EDoc