api() = kpro:api()
batch_input() = kpro:batch_input()
client_id() = kpro:client_id()
corr_id() = kpro:corr_id()
count() = kpro:count()
fetch_opts() = #{max_wait_time => wait(), min_bytes => count(), max_bytes => count(), isolation_level => isolation_level(), session_id => kpro:int32(), session_epoch => kpro:int32(), rack_id => iodata()}
group_id() = kpro:group_id()
isolation_level() = kpro:isolation_level()
msg_ts() = kpro:msg_ts()
offset() = kpro:offset()
offsets_to_commit() = kpro:offsets_to_commit()
partition() = kpro:partition()
produce_opts() = kpro:produce_opts()
req() = kpro:req()
struct() = kpro:struct()
topic() = kpro:topic()
txn_ctx() = kpro:txn_ctx()
vsn() = kpro:vsn()
wait() = kpro:wait()
add_offsets_to_txn/2 | Make add_offsets_to_txn request. |
add_partitions_to_txn/2 | Make add_partitions_to_txn request. |
alter_configs/3 | Make an alter_configs request. |
create_partitions/3 | Make a create_partitions request. |
create_topics/3 | Make create_topics request. |
delete_topics/3 | Make delete_topics request. |
describe_configs/3 | Make a describe_configs request. |
encode/3 | Encode a request to bytes that can be sent on wire. |
end_txn/2 | Make end_txn request. |
fetch/5 | Help function to construct a fetch request
against one single topic-partition. |
list_offsets/4 | Help function to contruct a list_offset request
against one single topic-partition. |
list_offsets/5 | Help function to contruct a list_offset request against one single
topic-partition. |
list_offsets/6 | Extends list_offsets/5 with leader-epoch number which can be obtained
from metadata response for each partition. |
make/3 | Help function to make a request body. |
metadata/2 | Make a metadata request. |
metadata/3 | Make a metadata request. |
produce/4 | Help function to construct a produce request. |
produce/5 | Help function to construct a produce request. |
txn_offset_commit/4 | Make a txn_offset_commit request. |
add_offsets_to_txn(TxnCtx::txn_ctx(), CgId::group_id()) -> req()
Make add_offsets_to_txn
request.
add_partitions_to_txn(TxnCtx::txn_ctx(), TopicPartitionList::[{topic(), partition()}]) -> req()
Make add_partitions_to_txn
request.
alter_configs(Vsn::vsn(), Resources::[Resources::kpro:struct()], Opts::#{validate_only => boolean()}) -> req()
Make an alter_configs
request.
create_partitions(Vsn::vsn(), Topics::[Topics::kpro:struct()], Opts::#{timeout => kpro:int32(), validate_only => boolean()}) -> req()
Make a create_partitions
request.
create_topics(Vsn::vsn(), Topics::[Topics::kpro:struct()], Opts::#{timeout => kpro:int32(), validate_only => boolean()}) -> req()
Make create_topics
request.
if 0 is given as timeout
option the request will trigger a creation
but return immediately.
validate_only
option is only relevant when the API version is
greater than 0.
delete_topics(Vsn::vsn(), Topics::[topic()], Opts::#{timeout => kpro:int32()}) -> req()
Make delete_topics
request.
describe_configs(Vsn::vsn(), Resources::[Resources::kpro:struct()], Opts::#{include_synonyms => boolean()}) -> req()
Make a describe_configs
request.
include_synonyms
option is only relevant when the API version is
greater than 0.
encode(ClientName::client_id(), CorrId::corr_id(), Req::req()) -> iodata()
Encode a request to bytes that can be sent on wire.
Make end_txn
request.
fetch(Vsn::vsn(), Topic::topic(), Partition::partition(), Offset::offset(), Opts::fetch_opts()) -> req()
Help function to construct a fetch
request
against one single topic-partition. In transactional mode, set
IsolationLevel = kpro_read_uncommitted
to fetch uncommitted messages.
list_offsets(Vsn::vsn(), Topic::topic(), Partition::partition(), Time::msg_ts()) -> req()
Help function to contruct a list_offset
request
against one single topic-partition.
list_offsets(Vsn::vsn(), Topic::topic(), Partition::partition(), Time::latest | earliest | msg_ts(), IsolationLevel::isolation_level()) -> req()
Help function to contruct a list_offset
request against one single
topic-partition. In transactional mode,
set IsolationLevel = ?kpro_read_uncommitted
to list uncommitted offsets.
list_offsets(Vsn::vsn(), Topic::topic(), Partition::partition(), Time0::latest | earliest | msg_ts(), IsolationLevel::isolation_level(), LeaderEpoch::kpro:leader_epoch()) -> req()
Extends list_offsets/5
with leader-epoch number which can be obtained
from metadata response for each partition.
Help function to make a request body.
Make a metadata
request
Make a metadata
request
produce(Vsn, Topic, Partition, Batch) -> any()
Help function to construct a produce request.
produce(Vsn::vsn(), Topic::topic(), Partition::partition(), Batch::binary() | batch_input(), Opts::produce_opts()) -> req()
Help function to construct a produce request.
By default, it constructs a non-transactional produce request.
For transactional produce requests, below conditions should be met.
1. Batch
arg must be be a [map()]
to indicate magic v2,
for example: [#{key => Key, value => Value, ts => Ts}]
.
Current system time will be taken if ts
is missing in batch input.
It may also be binary()
if user choose to encode a batch beforehand.
This could be helpful when a large batch can be encoded in another
process, so it may pass large binary instead of list between processes.
2. first_sequence
must exist in Opts
.
It should be the sequence number for the fist message in batch.
Maintained by producer, sequence numbers should start from zero and be
monotonically increasing, with one sequence number per topic-partition.
3. txn_ctx
(which is of spec kpro:txn_ctx()
) must exist in Opts
txn_offset_commit(GrpId::group_id(), TxnCtx::txn_ctx(), Offsets::offsets_to_commit(), UserData::binary()) -> req()
Make a txn_offset_commit
request.
Generated by EDoc