View Source kpro_req_lib (kafka_protocol v4.1.9)
Summary
Types
Options for a fetch
request.
Functions
Make
add_offsets_to_txn
request.Make
add_partitions_to_txn
request.Make an
alter_configs
request.Make a
create_partitions
request.Make
create_topics
request. if 0 is given as timeout
option the request will trigger a creation but return immediately. validate_only
option is only relevant when the API version is greater than 0.Make
delete_topics
request.Make a
describe_configs
request. include_synonyms
option is only relevant when the API version is greater than 0.Encode a request to bytes that can be sent on wire.
Make
end_txn
request.Help function to construct a
fetch
request against one single topic-partition. In transactional mode, set IsolationLevel = kpro_read_uncommitted
to fetch uncommitted messages.Help function to construct a
list_offset
request against one single topic-partition.Help function to construct a
list_offset
request against one single topic-partition. In transactional mode, set IsolationLevel = ?kpro_read_uncommitted
to list uncommitted offsets.Extends
list_offsets/5
with leader-epoch number which can be obtained from metadata response for each partition.Help function to make a request body.
Make a
metadata
requestMake a
metadata
requestHelp function to construct a produce request.
Help function to construct a produce request. By default, it constructs a non-transactional produce request. For transactional produce requests, below conditions should be met. 1.
Batch
arg must be be a [map()]
to indicate magic v2, for example: [#{key => Key, value => Value, ts => Ts}]
. Current system time will be taken if ts
is missing in batch input. It may also be binary()
if user choose to encode a batch beforehand. This could be helpful when a large batch can be encoded in another process, so it may pass large binary instead of list between processes. 2. first_sequence
must exist in Opts
. It should be the sequence number for the fist message in batch. Maintained by producer, sequence numbers should start from zero and be monotonically increasing, with one sequence number per topic-partition. 3. txn_ctx
(which is of spec kpro:txn_ctx()
) must exist in Opts
Make a
txn_offset_commit
request.Types
-type api() :: kpro:api().
-type batch_input() :: kpro:batch_input().
-type client_id() :: kpro:client_id().
-type corr_id() :: kpro:corr_id().
-type count() :: kpro:count().
-type fetch_opts() :: #{max_wait_time => wait(), min_bytes => count(), max_bytes => count(), isolation_level => isolation_level(), session_id => kpro:int32(), session_epoch => kpro:int32(), rack_id => iodata()}.
Options for a fetch
request.
max_wait_time
: The maximum time (in millis) to block wait until there are enough messages that have in sum at leastmin_bytes
bytes. The waiting will end as soon as eithermin_bytes
is satisfied ormax_wait_time
is exceeded, whichever comes first. Defaults to 1 second.min_bytes
: The minimum size of the message set. If it there are not enough messages, Kafka will block wait (but at most formax_wait_time
). This implies that the response may be actually smaller in case the time runs out. If you set it to 0, Kafka will respond immediately (possibly with an empty message set). You can use this option together withmax_wait_time
to configure throughput, latency, and size of message sets. Defaults to 0.max_bytes
: The maximum size of the message set. Note that this is not an absolute maximum, if the first message in the message set is larger than this value, the message will still be returned to ensure that progress can be made. Defaults to 1 MB.isolation_level
: This setting controls the visibility of transactional records. Usingread_uncommitted
makes all records visible. Withread_committed
, non-transactional and committed transactional records are visible. To be more concrete,read_committed
returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard aborted transactional records. Defaults toread_committed
.session_id
: Fetch session ID. This can be useful when the fetch request spans over multiple topic-partitions. However, fetch requests inkpro
can span only over a single topic-partition and sokpro
by default does not use fetch sessions (by setting appropriatesession_id
andsession_epoch
default values). Defaults to 0.session_epoch
: Fetch session epoch. Holds the same as above. Defaults to -1.rack_id
: The consumer's rack ID. This allow consumers fetching from closest replica (instead the leader). Defaults toundefined
.
-type group_id() :: kpro:group_id().
-type isolation_level() :: kpro:isolation_level().
-type msg_ts() :: kpro:msg_ts().
-type offset() :: kpro:offset().
-type offsets_to_commit() :: kpro:offsets_to_commit().
-type partition() :: kpro:partition().
-type produce_opts() :: kpro:produce_opts().
-type req() :: kpro:req().
-type struct() :: kpro:struct().
-type topic() :: kpro:topic().
-type txn_ctx() :: kpro:txn_ctx().
-type vsn() :: kpro:vsn().
-type wait() :: kpro:wait().
Functions
add_offsets_to_txn
request.
add_partitions_to_txn
request.
-spec alter_configs(vsn(), [Resources :: kpro:struct()], #{validate_only => boolean()}) -> req().
alter_configs
request.
-spec create_partitions(vsn(), [Topics :: kpro:struct()], #{timeout => kpro:int32(), validate_only => boolean()}) -> req().
create_partitions
request.
-spec create_topics(vsn(), [Topics :: kpro:struct()], #{timeout => kpro:int32(), validate_only => boolean()}) -> req().
create_topics
request. if 0 is given as timeout
option the request will trigger a creation but return immediately. validate_only
option is only relevant when the API version is greater than 0.
-spec delete_topics(vsn(), [topic()], #{timeout => kpro:int32()}) -> req().
delete_topics
request.
-spec describe_configs(vsn(), [Resources :: kpro:struct()], #{include_synonyms => boolean()}) -> req().
describe_configs
request. include_synonyms
option is only relevant when the API version is greater than 0.
end_txn
request.
fetch
request against one single topic-partition. In transactional mode, set IsolationLevel = kpro_read_uncommitted
to fetch uncommitted messages.
list_offset
request against one single topic-partition.
-spec list_offsets(vsn(), topic(), partition(), latest | earliest | msg_ts(), isolation_level()) -> req().
list_offset
request against one single topic-partition. In transactional mode, set IsolationLevel = ?kpro_read_uncommitted
to list uncommitted offsets.
Link to this function
list_offsets(Vsn, Topic, Partition, Time0, IsolationLevel, LeaderEpoch)
View Source-spec list_offsets(vsn(), topic(), partition(), latest | earliest | msg_ts(), isolation_level(), kpro:leader_epoch()) -> req().
list_offsets/5
with leader-epoch number which can be obtained from metadata response for each partition.
metadata
request
metadata
request
-spec produce(vsn(), topic(), partition(), binary() | batch_input(), produce_opts()) -> req().
Batch
arg must be be a [map()]
to indicate magic v2, for example: [#{key => Key, value => Value, ts => Ts}]
. Current system time will be taken if ts
is missing in batch input. It may also be binary()
if user choose to encode a batch beforehand. This could be helpful when a large batch can be encoded in another process, so it may pass large binary instead of list between processes. 2. first_sequence
must exist in Opts
. It should be the sequence number for the fist message in batch. Maintained by producer, sequence numbers should start from zero and be monotonically increasing, with one sequence number per topic-partition. 3. txn_ctx
(which is of spec kpro:txn_ctx()
) must exist in Opts
-spec txn_offset_commit(group_id(), txn_ctx(), offsets_to_commit(), binary()) -> req().
txn_offset_commit
request.