View Source kpro_req_lib (kafka_protocol v4.1.10)

Summary

Functions

Make add_offsets_to_txn request.
Make add_partitions_to_txn request.
Make an alter_configs request.
Make a create_partitions request.
Make create_topics request. if 0 is given as timeout option the request will trigger a creation but return immediately. validate_only option is only relevant when the API version is greater than 0.
Make delete_topics request.
Make a describe_configs request. include_synonyms option is only relevant when the API version is greater than 0.
Encode a request to bytes that can be sent on wire.
Make end_txn request.
Help function to construct a fetch request against one single topic-partition. In transactional mode, set IsolationLevel = kpro_read_uncommitted to fetch uncommitted messages.
Help function to construct a list_offset request against one single topic-partition.
Help function to construct a list_offset request against one single topic-partition. In transactional mode, set IsolationLevel = ?kpro_read_uncommitted to list uncommitted offsets.
Extends list_offsets/5 with leader-epoch number which can be obtained from metadata response for each partition.
Help function to make a request body.
Make a metadata request
Help function to construct a produce request.
Help function to construct a produce request. By default, it constructs a non-transactional produce request. For transactional produce requests, below conditions should be met. 1. Batch arg must be be a [map()] to indicate magic v2, for example: [#{key => Key, value => Value, ts => Ts}]. Current system time will be taken if ts is missing in batch input. It may also be binary() if user choose to encode a batch beforehand. This could be helpful when a large batch can be encoded in another process, so it may pass large binary instead of list between processes. 2. first_sequence must exist in Opts. It should be the sequence number for the fist message in batch. Maintained by producer, sequence numbers should start from zero and be monotonically increasing, with one sequence number per topic-partition. 3. txn_ctx (which is of spec kpro:txn_ctx()) must exist in Opts
Make a txn_offset_commit request.

Types

-type api() :: kpro:api().
-type batch_input() :: kpro:batch_input().
-type client_id() :: kpro:client_id().
-type corr_id() :: kpro:corr_id().
-type count() :: kpro:count().
-type fetch_opts() ::
          #{max_wait_time => wait(),
            min_bytes => count(),
            max_bytes => count(),
            isolation_level => isolation_level(),
            session_id => kpro:int32(),
            session_epoch => kpro:int32(),
            rack_id => iodata()}.

Options for a fetch request.

It is a map with following keys (all of them are optional):
  • max_wait_time: The maximum time (in millis) to block wait until there are enough messages that have in sum at least min_bytes bytes. The waiting will end as soon as either min_bytes is satisfied or max_wait_time is exceeded, whichever comes first. Defaults to 1 second.
  • min_bytes: The minimum size of the message set. If it there are not enough messages, Kafka will block wait (but at most for max_wait_time). This implies that the response may be actually smaller in case the time runs out. If you set it to 0, Kafka will respond immediately (possibly with an empty message set). You can use this option together with max_wait_time to configure throughput, latency, and size of message sets. Defaults to 0.
  • max_bytes: The maximum size of the message set. Note that this is not an absolute maximum, if the first message in the message set is larger than this value, the message will still be returned to ensure that progress can be made. Defaults to 1 MB.
  • isolation_level: This setting controls the visibility of transactional records. Using read_uncommitted makes all records visible. With read_committed, non-transactional and committed transactional records are visible. To be more concrete, read_committed returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard aborted transactional records. Defaults to read_committed.
  • session_id: Fetch session ID. This can be useful when the fetch request spans over multiple topic-partitions. However, fetch requests in kpro can span only over a single topic-partition and so kpro by default does not use fetch sessions (by setting appropriate session_id and session_epoch default values). Defaults to 0.
  • session_epoch: Fetch session epoch. Holds the same as above. Defaults to -1.
  • rack_id: The consumer's rack ID. This allow consumers fetching from closest replica (instead the leader). Defaults to undefined.
-type group_id() :: kpro:group_id().
-type isolation_level() :: kpro:isolation_level().
-type msg_ts() :: kpro:msg_ts().
-type offset() :: kpro:offset().
-type offsets_to_commit() :: kpro:offsets_to_commit().
-type partition() :: kpro:partition().
-type produce_opts() :: kpro:produce_opts().
-type req() :: kpro:req().
-type struct() :: kpro:struct().
-type topic() :: kpro:topic().
-type txn_ctx() :: kpro:txn_ctx().
-type vsn() :: kpro:vsn().
-type wait() :: kpro:wait().

Functions

Link to this function

add_offsets_to_txn(TxnCtx, CgId)

View Source
-spec add_offsets_to_txn(txn_ctx(), group_id()) -> req().
Make add_offsets_to_txn request.
Link to this function

add_partitions_to_txn(TxnCtx, TopicPartitionList)

View Source
-spec add_partitions_to_txn(txn_ctx(), [{topic(), partition()}]) -> req().
Make add_partitions_to_txn request.
Link to this function

alter_configs(Vsn, Resources, Opts)

View Source
-spec alter_configs(vsn(), [Resources :: kpro:struct()], #{validate_only => boolean()}) -> req().
Make an alter_configs request.
Link to this function

create_partitions(Vsn, Topics, Opts)

View Source
-spec create_partitions(vsn(),
                        [Topics :: kpro:struct()],
                        #{timeout => kpro:int32(), validate_only => boolean()}) ->
                           req().
Make a create_partitions request.
Link to this function

create_topics(Vsn, Topics, Opts)

View Source
-spec create_topics(vsn(),
                    [Topics :: kpro:struct()],
                    #{timeout => kpro:int32(), validate_only => boolean()}) ->
                       req().
Make create_topics request. if 0 is given as timeout option the request will trigger a creation but return immediately. validate_only option is only relevant when the API version is greater than 0.
Link to this function

delete_topics(Vsn, Topics, Opts)

View Source
-spec delete_topics(vsn(), [topic()], #{timeout => kpro:int32()}) -> req().
Make delete_topics request.
Link to this function

describe_configs(Vsn, Resources, Opts)

View Source
-spec describe_configs(vsn(), [Resources :: kpro:struct()], #{include_synonyms => boolean()}) -> req().
Make a describe_configs request. include_synonyms option is only relevant when the API version is greater than 0.
Link to this function

encode(ClientName, CorrId, Req)

View Source
-spec encode(client_id(), corr_id(), req()) -> iodata().
Encode a request to bytes that can be sent on wire.
Link to this function

end_txn(TxnCtx, CommitOrAbort)

View Source
-spec end_txn(txn_ctx(), commit | abort) -> req().
Make end_txn request.
Link to this function

fetch(Vsn, Topic, Partition, Offset, Opts)

View Source
-spec fetch(vsn(), topic(), partition(), offset(), fetch_opts()) -> req().
Help function to construct a fetch request against one single topic-partition. In transactional mode, set IsolationLevel = kpro_read_uncommitted to fetch uncommitted messages.
Link to this function

list_offsets(Vsn, Topic, Partition, Time)

View Source
-spec list_offsets(vsn(), topic(), partition(), msg_ts()) -> req().
Help function to construct a list_offset request against one single topic-partition.
Link to this function

list_offsets(Vsn, Topic, Partition, Time, IsolationLevel)

View Source
-spec list_offsets(vsn(), topic(), partition(), latest | earliest | msg_ts(), isolation_level()) ->
                      req().
Help function to construct a list_offset request against one single topic-partition. In transactional mode, set IsolationLevel = ?kpro_read_uncommitted to list uncommitted offsets.
Link to this function

list_offsets(Vsn, Topic, Partition, Time0, IsolationLevel, LeaderEpoch)

View Source
-spec list_offsets(vsn(),
                   topic(),
                   partition(),
                   latest | earliest | msg_ts(),
                   isolation_level(),
                   kpro:leader_epoch()) ->
                      req().
Extends list_offsets/5 with leader-epoch number which can be obtained from metadata response for each partition.
-spec make(api(), vsn(), struct()) -> req().
Help function to make a request body.
-spec metadata(vsn(), all | [topic()]) -> req().
Make a metadata request
Link to this function

metadata(Vsn, Topics, IsAutoCreateAllowed)

View Source
-spec metadata(vsn(), all | [topic()], boolean()) -> req().
Make a metadata request
Link to this function

produce(Vsn, Topic, Partition, Batch)

View Source
Help function to construct a produce request.
Link to this function

produce(Vsn, Topic, Partition, Batch, Opts)

View Source
-spec produce(vsn(), topic(), partition(), binary() | batch_input(), produce_opts()) -> req().
Help function to construct a produce request. By default, it constructs a non-transactional produce request. For transactional produce requests, below conditions should be met. 1. Batch arg must be be a [map()] to indicate magic v2, for example: [#{key => Key, value => Value, ts => Ts}]. Current system time will be taken if ts is missing in batch input. It may also be binary() if user choose to encode a batch beforehand. This could be helpful when a large batch can be encoded in another process, so it may pass large binary instead of list between processes. 2. first_sequence must exist in Opts. It should be the sequence number for the fist message in batch. Maintained by producer, sequence numbers should start from zero and be monotonically increasing, with one sequence number per topic-partition. 3. txn_ctx (which is of spec kpro:txn_ctx()) must exist in Opts
Link to this function

txn_offset_commit(GrpId, TxnCtx, Offsets, UserData)

View Source
-spec txn_offset_commit(group_id(), txn_ctx(), offsets_to_commit(), binary()) -> req().
Make a txn_offset_commit request.