View Source brod_consumer (brod v4.3.2)

Kafka consumers work in poll mode. In brod, brod_consumer is the poller, which is constantly asking for more data from the kafka node which is a leader for the given partition.

By subscribing to brod_consumer a process should receive the polled message sets (not individual messages) into its mailbox. Shape of the message is documented at brod:subscribe/5.

Messages processed by the subscriber has to be acked by calling ack/2 (or brod:consume_ack/4) to notify the consumer that all messages before the acknowledged offsets are processed, hence more messages can be fetched and sent to the subscriber and the subscriber won't be overwhelmed by it.

Each consumer can have only one subscriber.

See the overview for some more information and examples.

Summary

Functions

Subscriber confirms that a message (identified by offset) has been consumed, consumer process now may continue to fetch more messages.

Enable/disable debugging on the consumer process.

Get connection pid. Test/debug only.

Start (link) a partition consumer.

Subscribe or resubscribe on messages from a partition.

Unsubscribe the current subscriber.

Types

-type bytes() :: non_neg_integer().
-type config() :: brod:consumer_config().
-type isolation_level() :: kpro:isolation_level().
-type offset() :: brod:offset().
-type offset_reset_policy() :: reset_by_subscriber | reset_to_earliest | reset_to_latest.
-type offset_time() :: brod:offset_time().
-type partition() :: brod:partition().
-type pending() :: {offset(), bytes()}.
-type pending_acks() ::
          #pending_acks{count :: non_neg_integer(), bytes :: bytes(), queue :: pending_queue()}.
-type pending_queue() :: queue:queue(pending()).
-type state() ::
          #state{client_pid :: ignore | pid(),
                 bootstrap :: ignore | get | brod:bootstrap(),
                 connection :: undefined | pid(),
                 topic :: binary(),
                 partition :: integer(),
                 begin_offset :: offset_time(),
                 max_wait_time :: integer(),
                 min_bytes :: bytes(),
                 max_bytes_orig :: bytes(),
                 sleep_timeout :: integer(),
                 prefetch_count :: integer(),
                 last_req_ref :: undefined | reference(),
                 subscriber :: undefined | pid(),
                 subscriber_mref :: undefined | reference(),
                 pending_acks :: pending_acks(),
                 is_suspended :: boolean(),
                 offset_reset_policy :: offset_reset_policy(),
                 avg_bytes :: number(),
                 max_bytes :: bytes(),
                 size_stat_window :: non_neg_integer(),
                 prefetch_bytes :: non_neg_integer(),
                 connection_mref :: undefined | reference(),
                 isolation_level :: isolation_level()}.
-type topic() :: brod:topic().

Functions

-spec ack(pid(), brod:offset()) -> ok.
Subscriber confirms that a message (identified by offset) has been consumed, consumer process now may continue to fetch more messages.
-spec debug(pid(), print | string() | none) -> ok.

Enable/disable debugging on the consumer process.

debug(Pid, print) prints debug info to stdout.

debug(Pid, File) prints debug info to a file File.
Get connection pid. Test/debug only.
Link to this function

start_link(Bootstrap, Topic, Partition, Config)

View Source
-spec start_link(pid() | brod:bootstrap(), topic(), partition(), config()) ->
                    {ok, pid()} | {error, any()}.

Equivalent to start_link(ClientPid, Topic, Partition, Config, []).

Link to this function

start_link(Bootstrap, Topic, Partition, Config, Debug)

View Source
-spec start_link(pid() | brod:bootstrap(), topic(), partition(), config(), [any()]) ->
                    {ok, pid()} | {error, any()}.

Start (link) a partition consumer.

Possible configs:
  • min_bytes (optional, default = 0)

    Minimal bytes to fetch in a batch of messages
  • max_bytes (optional, default = 1MB)

    Maximum bytes to fetch in a batch of messages.

    NOTE: this value might be expanded to retry when it is not enough to fetch even a single message, then slowly shrunk back to the given value.
  • max_wait_time (optional, default = 10000 ms)

    Max number of seconds allowed for the broker to collect min_bytes of messages in fetch response
  • sleep_timeout (optional, default = 1000 ms)

    Allow consumer process to sleep this amount of ms if kafka replied 'empty' message set.
  • prefetch_count (optional, default = 10)

    The window size (number of messages) allowed to fetch-ahead.
  • prefetch_bytes (optional, default = 100KB)

    The total number of bytes allowed to fetch-ahead. brod_consumer is greed, it only stops fetching more messages in when number of unacked messages has exceeded prefetch_count AND the unacked total volume has exceeded prefetch_bytes
  • begin_offset (optional, default = latest)

    The offset from which to begin fetch requests. A subscriber may consume and process messages, then persist the associated offset to a persistent storage, then start (or restart) from last_processed_offset + 1 as the begin_offset to proceed. The offset has to already exist at the time of calling.
  • offset_reset_policy (optional, default = reset_by_subscriber)

    How to reset begin_offset if OffsetOutOfRange exception is received.

    reset_by_subscriber: consumer is suspended (is_suspended=true in state) and wait for subscriber to re-subscribe with a new begin_offset option.

    reset_to_earliest: consume from the earliest offset.

    reset_to_latest: consume from the last available offset.
  • size_stat_window: (optional, default = 5)

    The moving-average window size to calculate average message size. Average message size is used to shrink max_bytes in fetch requests after it has been expanded to fetch a large message. Use 0 to immediately shrink back to original max_bytes from config. A size estimation allows users to set a relatively small max_bytes, then let it dynamically adjust to a number around PrefetchCount * AverageSize
  • isolation_level: (optional, default = read_committed)

    Level to control what transaction records are exposed to the consumer. Two values are allowed, read_uncommitted to retrieve all records, independently on the transaction outcome (if any), and read_committed to get only the records from committed transactions
  • share_leader_conn: (optional, default = false)

    Whether or not share the partition leader connection with other producers or consumers. Set to true to consume less TCP connections towards Kafka, but may lead to higher fetch latency. This is because Kafka can ony accumulate messages for the oldest fetch request, later requests behind it may get blocked until max_wait_time expires for the oldest one
-spec stop(pid()) -> ok | {error, any()}.
Link to this function

stop_maybe_kill(Pid, Timeout)

View Source
-spec stop_maybe_kill(pid(), timeout()) -> ok.
Link to this function

subscribe(Pid, SubscriberPid, ConsumerOptions)

View Source
-spec subscribe(pid(), pid(), config()) -> ok | {error, any()}.

Subscribe or resubscribe on messages from a partition.

Caller may specify a set of options extending consumer config. It is possible to update parameters such as max_bytes and max_wait_time, or the starting point (begin_offset) of the data stream. Note that you currently cannot update isolation_level.

Possible options are documented at start_link/5.
Link to this function

unsubscribe(Pid, SubscriberPid)

View Source
-spec unsubscribe(pid(), pid()) -> ok | {error, any()}.
Unsubscribe the current subscriber.