Module kpro_req_lib

Data Types

api()

api() = kpro:api()

batch_input()

batch_input() = kpro:batch_input()

client_id()

client_id() = kpro:client_id()

corr_id()

corr_id() = kpro:corr_id()

count()

count() = kpro:count()

fetch_opts()

fetch_opts() = #{max_wait_time => wait(), min_bytes => count(), max_bytes => count(), isolation_level => isolation_level(), session_id => kpro:int32(), session_epoch => kpro:int32(), rack_id => iodata()}

group_id()

group_id() = kpro:group_id()

isolation_level()

isolation_level() = kpro:isolation_level()

msg_ts()

msg_ts() = kpro:msg_ts()

offset()

offset() = kpro:offset()

offsets_to_commit()

offsets_to_commit() = kpro:offsets_to_commit()

partition()

partition() = kpro:partition()

produce_opts()

produce_opts() = kpro:produce_opts()

req()

req() = kpro:req()

struct()

struct() = kpro:struct()

topic()

topic() = kpro:topic()

txn_ctx()

txn_ctx() = kpro:txn_ctx()

vsn()

vsn() = kpro:vsn()

wait()

wait() = kpro:wait()

Function Index

add_offsets_to_txn/2Make add_offsets_to_txn request.
add_partitions_to_txn/2Make add_partitions_to_txn request.
alter_configs/3Make an alter_configs request.
create_partitions/3Make a create_partitions request.
create_topics/3Make create_topics request.
delete_topics/3Make delete_topics request.
describe_configs/3Make a describe_configs request.
encode/3Encode a request to bytes that can be sent on wire.
end_txn/2Make end_txn request.
fetch/5Help function to construct a fetch request against one single topic-partition.
list_offsets/4Help function to contruct a list_offset request against one single topic-partition.
list_offsets/5Help function to contruct a list_offset request against one single topic-partition.
list_offsets/6Extends list_offsets/5 with leader-epoch number which can be obtained from metadata response for each partition.
make/3Help function to make a request body.
metadata/2Make a metadata request.
metadata/3Make a metadata request.
produce/4Help function to construct a produce request.
produce/5Help function to construct a produce request.
txn_offset_commit/4Make a txn_offset_commit request.

Function Details

add_offsets_to_txn/2

add_offsets_to_txn(TxnCtx::txn_ctx(), CgId::group_id()) -> req()

Make add_offsets_to_txn request.

add_partitions_to_txn/2

add_partitions_to_txn(TxnCtx::txn_ctx(), TopicPartitionList::[{topic(), partition()}]) -> req()

Make add_partitions_to_txn request.

alter_configs/3

alter_configs(Vsn::vsn(), Resources::[Resources::kpro:struct()], Opts::#{validate_only => boolean()}) -> req()

Make an alter_configs request.

create_partitions/3

create_partitions(Vsn::vsn(), Topics::[Topics::kpro:struct()], Opts::#{timeout => kpro:int32(), validate_only => boolean()}) -> req()

Make a create_partitions request.

create_topics/3

create_topics(Vsn::vsn(), Topics::[Topics::kpro:struct()], Opts::#{timeout => kpro:int32(), validate_only => boolean()}) -> req()

Make create_topics request. if 0 is given as timeout option the request will trigger a creation but return immediately. validate_only option is only relevant when the API version is greater than 0.

delete_topics/3

delete_topics(Vsn::vsn(), Topics::[topic()], Opts::#{timeout => kpro:int32()}) -> req()

Make delete_topics request.

describe_configs/3

describe_configs(Vsn::vsn(), Resources::[Resources::kpro:struct()], Opts::#{include_synonyms => boolean()}) -> req()

Make a describe_configs request. include_synonyms option is only relevant when the API version is greater than 0.

encode/3

encode(ClientName::client_id(), CorrId::corr_id(), Req::req()) -> iodata()

Encode a request to bytes that can be sent on wire.

end_txn/2

end_txn(TxnCtx::txn_ctx(), CommitOrAbort::commit | abort) -> req()

Make end_txn request.

fetch/5

fetch(Vsn::vsn(), Topic::topic(), Partition::partition(), Offset::offset(), Opts::fetch_opts()) -> req()

Help function to construct a fetch request against one single topic-partition. In transactional mode, set IsolationLevel = kpro_read_uncommitted to fetch uncommitted messages.

list_offsets/4

list_offsets(Vsn::vsn(), Topic::topic(), Partition::partition(), Time::msg_ts()) -> req()

Help function to contruct a list_offset request against one single topic-partition.

list_offsets/5

list_offsets(Vsn::vsn(), Topic::topic(), Partition::partition(), Time::latest | earliest | msg_ts(), IsolationLevel::isolation_level()) -> req()

Help function to contruct a list_offset request against one single topic-partition. In transactional mode, set IsolationLevel = ?kpro_read_uncommitted to list uncommitted offsets.

list_offsets/6

list_offsets(Vsn::vsn(), Topic::topic(), Partition::partition(), Time0::latest | earliest | msg_ts(), IsolationLevel::isolation_level(), LeaderEpoch::kpro:leader_epoch()) -> req()

Extends list_offsets/5 with leader-epoch number which can be obtained from metadata response for each partition.

make/3

make(API::api(), Vsn::vsn(), Fields::struct()) -> req()

Help function to make a request body.

metadata/2

metadata(Vsn::vsn(), Topics::all | [topic()]) -> req()

Make a metadata request

metadata/3

metadata(Vsn::vsn(), Topics::all | [topic()], IsAutoCreateAllowed::boolean()) -> req()

Make a metadata request

produce/4

produce(Vsn, Topic, Partition, Batch) -> any()

Help function to construct a produce request.

produce/5

produce(Vsn::vsn(), Topic::topic(), Partition::partition(), Batch::binary() | batch_input(), Opts::produce_opts()) -> req()

Help function to construct a produce request. By default, it constructs a non-transactional produce request. For transactional produce requests, below conditions should be met. 1. Batch arg must be be a [map()] to indicate magic v2, for example: [#{key => Key, value => Value, ts => Ts}]. Current system time will be taken if ts is missing in batch input. It may also be binary() if user choose to encode a batch beforehand. This could be helpful when a large batch can be encoded in another process, so it may pass large binary instead of list between processes. 2. first_sequence must exist in Opts. It should be the sequence number for the fist message in batch. Maintained by producer, sequence numbers should start from zero and be monotonically increasing, with one sequence number per topic-partition. 3. txn_ctx (which is of spec kpro:txn_ctx()) must exist in Opts

txn_offset_commit/4

txn_offset_commit(GrpId::group_id(), TxnCtx::txn_ctx(), Offsets::offsets_to_commit(), UserData::binary()) -> req()

Make a txn_offset_commit request.


Generated by EDoc