View Source brod_consumer (brod v3.16.3)

Link to this section Summary

Functions

Subscriber confirms that a message (identified by offset) has been consumed, consumer process now may continue to fetch more messages.

Enable/disable debugging on the consumer process.

Get connection pid. Test/debug only.

Start (link) a partition consumer.

Subscribe or resubscribe on messages from a partition. Caller may specify a set of options extending consumer config. It is possible to update parameters such as max_bytes and max_wait_time, or the starting point (begin_offset) of the data stream.

Unsubscribe the current subscriber.

Link to this section Types

-type bytes() :: non_neg_integer().
-type config() :: proplists:proplist().
-type isolation_level() :: kpro:isolation_level().
-type offset() :: brod:offset().
-type offset_reset_policy() :: reset_by_subscriber | reset_to_earliest | reset_to_latest.
-type offset_time() :: brod:offset_time().
-type options() :: brod:consumer_options().
-type partition() :: brod:partition().
-type pending() :: {offset(), bytes()}.
-type pending_acks() :: #pending_acks{}.
-type pending_queue() :: queue:queue(pending()).
-type state() :: #state{}.
-type topic() :: brod:topic().

Link to this section Functions

-spec ack(pid(), brod:offset()) -> ok.
Subscriber confirms that a message (identified by offset) has been consumed, consumer process now may continue to fetch more messages.
-spec debug(pid(), print | string() | none) -> ok.

Enable/disable debugging on the consumer process.

debug(Pid, print) prints debug info to stdout.

debug(Pid, File) prints debug info to a file File.
Get connection pid. Test/debug only.
Link to this function

start_link(Bootstrap, Topic, Partition, Config)

View Source
-spec start_link(pid() | brod:bootstrap(), topic(), partition(), config()) ->
              {ok, pid()} | {error, any()}.

Equivalent to start_link(ClientPid, Topic, Partition, Config, []).

Link to this function

start_link(Bootstrap, Topic, Partition, Config, Debug)

View Source
-spec start_link(pid() | brod:bootstrap(), topic(), partition(), config(), [any()]) ->
              {ok, pid()} | {error, any()}.

Start (link) a partition consumer.

Possible configs:
  • min_bytes (optional, default = 0)

    Minimal bytes to fetch in a batch of messages
  • max_bytes (optional, default = 1MB)

    Maximum bytes to fetch in a batch of messages.

    NOTE: this value might be expanded to retry when it is not enough to fetch even a single message, then slowly shrunk back to the given value.
  • max_wait_time (optional, default = 10000 ms)

    Max number of seconds allowed for the broker to collect min_bytes of messages in fetch response
  • sleep_timeout (optional, default = 1000 ms)

    Allow consumer process to sleep this amount of ms if kafka replied 'empty' message set.
  • prefetch_count (optional, default = 10)

    The window size (number of messages) allowed to fetch-ahead.
  • prefetch_bytes (optional, default = 100KB)

    The total number of bytes allowed to fetch-ahead. brod_consumer is greed, it only stops fetching more messages in when number of unacked messages has exceeded prefetch_count AND the unacked total volume has exceeded prefetch_bytes
  • begin_offset (optional, default = latest)

    The offset from which to begin fetch requests.
  • offset_reset_policy (optional, default = reset_by_subscriber)

    How to reset begin_offset if OffsetOutOfRange exception is received.

    reset_by_subscriber: consumer is suspended (is_suspended=true in state) and wait for subscriber to re-subscribe with a new begin_offset option.

    reset_to_earliest: consume from the earliest offset.

    reset_to_latest: consume from the last available offset.
  • size_stat_window: (optional, default = 5)

    The moving-average window size to calculate average message size. Average message size is used to shrink max_bytes in fetch requests after it has been expanded to fetch a large message. Use 0 to immediately shrink back to original max_bytes from config. A size estimation allows users to set a relatively small max_bytes, then let it dynamically adjust to a number around PrefetchCount * AverageSize
  • isolation_level: (optional, default = read_commited)

    Level to control what transaction records are exposed to the consumer. Two values are allowed, read_uncommitted to retrieve all records, independently on the transaction outcome (if any), and read_committed to get only the records from committed transactions
-spec stop(pid()) -> ok | {error, any()}.
Link to this function

stop_maybe_kill(Pid, Timeout)

View Source
-spec stop_maybe_kill(pid(), timeout()) -> ok.
Link to this function

subscribe(Pid, SubscriberPid, ConsumerOptions)

View Source
-spec subscribe(pid(), pid(), options()) -> ok | {error, any()}.

Subscribe or resubscribe on messages from a partition. Caller may specify a set of options extending consumer config. It is possible to update parameters such as max_bytes and max_wait_time, or the starting point (begin_offset) of the data stream.

Possible options:

All consumer configs as documented for start_link/5

begin_offset (optional, default = latest)

A subscriber may consume and process messages, then persist the associated offset to a persistent storage, then start (or restart) from last_processed_offset + 1 as the begin_offset to proceed. By default, it starts fetching from the latest available offset.
Link to this function

unsubscribe(Pid, SubscriberPid)

View Source
-spec unsubscribe(pid(), pid()) -> ok | {error, any()}.
Unsubscribe the current subscriber.