View Source kpro_req_lib (kafka_protocol v4.3.0)
Summary
Types
Options for a fetch request.
Functions
Make add_offsets_to_txn request.
Make add_partitions_to_txn request.
Make an alter_configs request.
Make a create_partitions request.
Make create_topics request. if 0 is given as timeout option the request will trigger a creation but return immediately. validate_only option is only relevant when the API version is greater than 0.
Make delete_topics request.
Make a describe_configs request. include_synonyms option is only relevant when the API version is greater than 0.
Encode a request to bytes that can be sent on wire.
Make end_txn request.
Help function to construct a fetch request against one single topic-partition. In transactional mode, set IsolationLevel = kpro_read_uncommitted to fetch uncommitted messages.
Help function to construct a list_offset request against one single topic-partition.
Help function to construct a list_offset request against one single topic-partition. In transactional mode, set IsolationLevel = ?kpro_read_uncommitted to list uncommitted offsets.
Extends list_offsets/5 with leader-epoch number which can be obtained from metadata response for each partition.
Help function to make a request body.
Help function to make a request body with provided reference or process alias. The refreence can be an alias to receive response this request.
Make a metadata request
Make a metadata request
Help function to construct a non-transactional produce request.
Help function to construct a produce request.
Help function to construct a produce request with a specified request reference. By default, it constructs a non-transactional produce request. Batch arg can be be a [map()] like [#{key => Key, value => Value, ts => Ts}]. Current system time will be taken if ts is missing in batch input. It may also be binary() or {magic_v2, Bytes, iolist()} if user choose to encode a batch beforehand which could be helpful when a large batch can be encoded in other processes. For transactional produce requests, below conditions should be met. - first_sequence must exist in Opts. It should be the sequence number for the fist message in batch. Maintained by producer, sequence numbers should start from zero and be monotonically increasing, with one sequence number per topic-partition. - txn_ctx (which is of spec kpro:txn_ctx()) must exist in Opts
Make a txn_offset_commit request.
Types
-type api() :: kpro:api().
-type batch_input() :: kpro:batch_input().
-type client_id() :: kpro:client_id().
-type corr_id() :: kpro:corr_id().
-type count() :: kpro:count().
-type fetch_opts() :: #{max_wait_time => wait(), min_bytes => count(), max_bytes => count(), isolation_level => isolation_level(), session_id => kpro:int32(), session_epoch => kpro:int32(), rack_id => iodata()}.
Options for a fetch request.
It is a map with following keys (all of them are optional):
max_wait_time: The maximum time (in millis) to block wait until there are enough messages that have in sum at leastmin_bytesbytes. The waiting will end as soon as eithermin_bytesis satisfied ormax_wait_timeis exceeded, whichever comes first. Defaults to 1 second.min_bytes: The minimum size of the message set. If it there are not enough messages, Kafka will block wait (but at most formax_wait_time). This implies that the response may be actually smaller in case the time runs out. If you set it to 0, Kafka will respond immediately (possibly with an empty message set). You can use this option together withmax_wait_timeto configure throughput, latency, and size of message sets. Defaults to 0.max_bytes: The maximum size of the message set. Note that this is not an absolute maximum, if the first message in the message set is larger than this value, the message will still be returned to ensure that progress can be made. Defaults to 1 MB.isolation_level: This setting controls the visibility of transactional records. Usingread_uncommittedmakes all records visible. Withread_committed, non-transactional and committed transactional records are visible. To be more concrete,read_committedreturns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard aborted transactional records. Defaults toread_committed.session_id: Fetch session ID. This can be useful when the fetch request spans over multiple topic-partitions. However, fetch requests inkprocan span only over a single topic-partition and sokproby default does not use fetch sessions (by setting appropriatesession_idandsession_epochdefault values). Defaults to 0.session_epoch: Fetch session epoch. Holds the same as above. Defaults to -1.rack_id: The consumer's rack ID. This allow consumers fetching from closest replica (instead the leader). Defaults toundefined.
-type group_id() :: kpro:group_id().
-type isolation_level() :: kpro:isolation_level().
-type msg_ts() :: kpro:msg_ts().
-type offset() :: kpro:offset().
-type offsets_to_commit() :: kpro:offsets_to_commit().
-type partition() :: kpro:partition().
-type produce_opts() :: kpro:produce_opts().
-type req() :: kpro:req().
-type struct() :: kpro:struct().
-type topic() :: kpro:topic().
-type txn_ctx() :: kpro:txn_ctx().
-type vsn() :: kpro:vsn().
-type wait() :: kpro:wait().
Functions
Make add_offsets_to_txn request.
Make add_partitions_to_txn request.
-spec alter_configs(vsn(), [Resources :: kpro:struct()], #{validate_only => boolean()}) -> req().
Make an alter_configs request.
-spec create_partitions(vsn(), [Topics :: kpro:struct()], #{timeout => kpro:int32(), validate_only => boolean()}) -> req().
Make a create_partitions request.
-spec create_topics(vsn(), [Topics :: kpro:struct()], #{timeout => kpro:int32(), validate_only => boolean()}) -> req().
Make create_topics request. if 0 is given as timeout option the request will trigger a creation but return immediately. validate_only option is only relevant when the API version is greater than 0.
-spec delete_topics(vsn(), [topic()], #{timeout => kpro:int32()}) -> req().
Make delete_topics request.
-spec describe_configs(vsn(), [Resources :: kpro:struct()], #{include_synonyms => boolean()}) -> req().
Make a describe_configs request. include_synonyms option is only relevant when the API version is greater than 0.
Encode a request to bytes that can be sent on wire.
Make end_txn request.
Help function to construct a fetch request against one single topic-partition. In transactional mode, set IsolationLevel = kpro_read_uncommitted to fetch uncommitted messages.
Help function to construct a list_offset request against one single topic-partition.
-spec list_offsets(vsn(), topic(), partition(), latest | earliest | msg_ts(), isolation_level()) -> req().
Help function to construct a list_offset request against one single topic-partition. In transactional mode, set IsolationLevel = ?kpro_read_uncommitted to list uncommitted offsets.
-spec list_offsets(vsn(), topic(), partition(), latest | earliest | msg_ts(), isolation_level(), kpro:leader_epoch()) -> req().
Extends list_offsets/5 with leader-epoch number which can be obtained from metadata response for each partition.
Help function to make a request body.
Help function to make a request body with provided reference or process alias. The refreence can be an alias to receive response this request.
Make a metadata request
Make a metadata request
-spec produce(vsn(), topic(), partition(), binary() | batch_input() | {magic_v2, non_neg_integer(), iolist()}) -> req().
Help function to construct a non-transactional produce request.
See also: produce/6.
-spec produce(vsn(), topic(), partition(), binary() | batch_input() | {magic_v2, non_neg_integer(), iolist()}, produce_opts()) -> req().
Help function to construct a produce request.
See also: produce/6.
-spec produce(vsn(), topic(), partition(), binary() | batch_input() | {magic_v2, non_neg_integer(), iolist()}, produce_opts(), kpro:req_ref()) -> req().
Help function to construct a produce request with a specified request reference. By default, it constructs a non-transactional produce request. Batch arg can be be a [map()] like [#{key => Key, value => Value, ts => Ts}]. Current system time will be taken if ts is missing in batch input. It may also be binary() or {magic_v2, Bytes, iolist()} if user choose to encode a batch beforehand which could be helpful when a large batch can be encoded in other processes. For transactional produce requests, below conditions should be met. - first_sequence must exist in Opts. It should be the sequence number for the fist message in batch. Maintained by producer, sequence numbers should start from zero and be monotonically increasing, with one sequence number per topic-partition. - txn_ctx (which is of spec kpro:txn_ctx()) must exist in Opts
-spec txn_offset_commit(group_id(), txn_ctx(), offsets_to_commit(), binary()) -> req().
Make a txn_offset_commit request.