api() = atom()
batch() = {batch_meta(), [message()]}
batch_decode_result() = {incomplete_batch, int32()} | [batch()]
offset or offset + associated user-data to commit
batch_input() = [msg_input()]
batch_meta() = undefined | #{batch_meta_key() => batch_meta_val()}
batch_meta_key() = is_transaction | is_control | last_offset | max_ts | producer_id
batch_meta_val() = boolean() | atom() | integer()
bytes() = binary()
client_id() = binary()
compress_option() = no_compression | gzip | snappy | lz4
conn_config() = kpro_connection:config()
connection() = kpro_connection:connection()
coordinator_type() = group | txn
corr_id() = int32()
count() = non_neg_integer()
decode_fun() = fun((binary()) -> {field_value(), binary()})
endpoint() = {hostname(), portnum()}
error_code() = int16() | atom()
fetch_opts() = kpro_req_lib:fetch_opts()
field_name() = atom() | field_tag()
field_tag() = integer()
field_value() = primitive() | [primitive()] | struct() | [struct()]
group_id() = binary()
header_key() = binary()
header_val() = binary()
headers() = [{header_key(), header_val()}]
hostname() = binary() | inet:hostname()
incomplete_batch() = {incomplete_batch, int32()}
int16() = -32768..32767
int32() = -2147483648..2147483647
int64() = -9223372036854775808..9223372036854775807
int8() = -128..127
isolation_level() = read_committed | read_uncommitted
key() = binary()
leader_epoch() = int32()
magic() = 0..2
message() = #kafka_message{offset = kpro:offset(), key = kpro:bytes(), value = kpro:bytes(), ts_type = undefined | kpro:timestamp_type(), ts = undefined | kpro:int64(), headers = kpro:headers()}
msg_input() = #{headers => headers(), ts => msg_ts(), key => key(), value => value()}
msg_ts() = int64()
offset() = int64()
offset_ud() = offset() | {offset(), binary()}
offsets_to_commit() = #{{topic(), partition()} => offset_ud()} | [{{topic(), partition()}, offset_ud()}]
partition() = int32()
portnum() = non_neg_integer()
primitive() = integer() | string() | binary() | atom()
primitive_type() = boolean | int8 | int16 | int32 | int64 | varint | string | nullable_string | bytes | records | unsigned_varint
produce_opts() = #{compression => compress_option(), required_acks => required_acks(), ack_timeout => wait(), txn_ctx => txn_ctx(), first_sequence => seqno()}
producer_epoch() = int16()
producer_id() = int64()
protocol() = plaintext | ssl | sasl_plaintext | sasl_ssl
req() = #kpro_req{ref = reference(), api = kpro:api(), vsn = kpro:vsn(), no_ack = boolean(), msg = iodata() | kpro:struct()}
required_acks() = -1..1 | all_isr | none | leader_only
rsp() = #kpro_rsp{ref = false | reference(), api = kpro:api(), vsn = kpro:vsn(), msg = binary() | kpro:struct()}
schema() = primitive_type() | struct_schema() | {array, schema()} | decode_fun()
caller defined decoder
seqno() = integer()
optional args to make produce request
stack() = [{api(), vsn()} | field_name()]
encode / decode stack
str() = undefined | string() | binary()
struct() = #{field_name() => field_value()} | [{field_name(), field_value()}]
struct_schema() = [{field_name(), schema()}]
timestamp_type() = undefined | create | append
topic() = binary()
transactional_id() = binary()
txn_ctx() = #{connection => connection(), transactional_id => transactional_id(), producer_id => producer_id(), producer_epoch => producer_id()}
value() = binary()
vsn() = non_neg_integer()
vsn_ranges() = #{api() => vsn_range()}
wait() = non_neg_integer()
close_connection/1 | Sotp connection process. |
connect/2 | Connect to the given endpoint. |
connect_any/2 | Connect to any of the endpoints in the given list. |
connect_controller/2 | |
connect_controller/3 | Connect to the controller broker of the kafka cluster. |
connect_coordinator/3 | Connect to group or transaction coordinator. |
connect_partition_leader/4 | Connect partition leader. |
connect_partition_leader/5 | Connect partition leader. |
decode_batches/1 | The message-set is not decoded upon receiving (in connection process). |
discover_coordinator/4 | Discover group or transactional coordinator. |
discover_partition_leader/4 | Discover partition leader broker endpoint. |
encode_batch/3 | Encode message batch for produce request. |
encode_request/3 | Encode request to byte stream. |
find/2 | |
find/3 | |
get_api_versions/1 | Qury API versions using the given kpro_connection pid. |
get_api_vsn_range/2 | Get version range for the given API. |
make_request/3 | Help function to make a request. |
parse_endpoints/1 | |
parse_endpoints/2 | |
produce_api_vsn_to_magic_vsn/1 | |
provide_compression/1 | Set snappy or lz4 compression modules. |
request_async/2 | Send a request without waiting for reply. |
request_sync/3 | Send a request, wait for response. |
send/2 | Same as @link request_async/2. |
txn_abort/1 | Abort transaction. |
txn_abort/2 | Abort transaction. |
txn_commit/1 | Commit transaction. |
txn_commit/2 | Commit transaction. |
txn_init_ctx/2 | Initialize a transaction context, the connection should be established towards transactional coordinator broker. |
txn_init_ctx/3 | Initialize a transaction context, the connection should be established towards transactional coordinator broker. |
txn_offset_commit/4 | see txn_offset_commit/5. |
txn_offset_commit/5 | Send transactional offset commit request to group coordinator. |
txn_send_cg/2 | |
txn_send_cg/3 | Add consumed offsets to transaction. |
txn_send_partitions/2 | |
txn_send_partitions/3 | Add partitions to transaction. |
close_connection(Connection::connection()) -> ok
Sotp connection process.
connect(Endpoint::endpoint(), ConnConfig::conn_config()) -> {ok, connection()} | {error, any()}
Connect to the given endpoint.
NOTE: Connection process is linked to caller unless nolink => true
is set in connection config
connect_any(Endpoints::[endpoint()], ConnConfig::conn_config()) -> {ok, connection()} | {error, any()}
Connect to any of the endpoints in the given list.
NOTE: Connection process is linked to caller unless nolink => true
is set in connection config
connect_controller(Bootstrap::connection() | [endpoint()], ConnConfig::conn_config()) -> {ok, connection()} | {error, any()}
See also: connect_controller/3.
connect_controller(Bootstrap::connection() | [endpoint()], ConnConfig::conn_config(), Opts::#{timeout => timeout()}) -> {ok, connection()} | {error, any()}
Connect to the controller broker of the kafka cluster.
connect_coordinator(Bootstrap::connection() | [endpoint()], ConnConfig::conn_config(), Args::#{type => coordinator_type(), id => binary(), timeout => timeout()}) -> {ok, connection()} | {error, any()}
Connect to group or transaction coordinator. If the first arg is not a connection pid but a list of bootstrapping endpoints, it will frist try to connect to any of the nodes NOTE: 'txn' type only applicable to kafka 0.11 or later
connect_partition_leader(Bootstrap::connection() | [endpoint()], ConnConfig::conn_config(), Topic::topic(), Partition::partition()) -> {ok, connection()} | {error, any()}
Connect partition leader.
If the fist arg is not an already established metadata connection
but a bootstrapping endpoint list, this function will first try to
establish a temp connection to any of the bootstrapping endpoints.
Then send metadata request to discover partition leader broker
Finally connect to the leader broker.
NOTE: Connection process is linked to caller unless nolink => true
is set in connection config.
connect_partition_leader(Bootstrap::connection() | [endpoint()], ConnConfig::conn_config(), Topic::topic(), Partition::partition(), Opts::#{timeout => timeout()}) -> {ok, connection()} | {error, any()}
Connect partition leader.
decode_batches(Bin::binary()) -> batch_decode_result()
The message-set is not decoded upon receiving (in connection process).
It is passed as binary to the consumer process and decoded there.
Return ?incomplete_batch(ExpectedSize)
if the fetch size is not big
enough for even one single message. Otherwise return [{Meta, Messages}]
where Meta
is either ?KPRO_NO_BATCH_META
for magic-version 0-1 or
kpro:batch_meta()
for magic-version 2 or above.
discover_coordinator(Connection::connection(), Type::coordinator_type(), Id::group_id() | transactional_id(), Timeout::timeout()) -> {ok, endpoint()} | {error, any()}
Discover group or transactional coordinator.
An implicit step performed in connect_coordinator
.
This is useful when the caller wants to re-use already established
towards the discovered endpoint.
discover_partition_leader(Connection::connection(), Topic::topic(), Partition::partition(), Timeout::timeout()) -> {ok, endpoint()} | {error, any()}
Discover partition leader broker endpoint.
An implicit step performed in connect_partition_leader
.
This is useful when the caller wants to re-use already established
towards the discovered endpoint.
encode_batch(Magic::magic(), Batch::batch_input(), Compression::compress_option()) -> binary()
Encode message batch for produce request.
encode_request(ClientId::client_id(), CorrId::corr_id(), Req::req()) -> iodata()
Encode request to byte stream.
find(Field::field_name(), Struct::struct()) -> field_value()
See also: kpro_lib:find/2.
find(Field::field_name(), Struct::struct(), Default::field_value()) -> field_value()
See also: kpro_lib:find/3.
get_api_versions(Connection::connection()) -> {ok, vsn_ranges()} | {error, any()}
Qury API versions using the given kpro_connection
pid.
get_api_vsn_range(Connection::connection(), API::api()) -> {ok, vsn_range()} | {error, any()}
Get version range for the given API.
Help function to make a request. See also kpro_req_lib for more help functions.
parse_endpoints(String) -> any()
See also: kpro_lib:parse_endpoints/2.
parse_endpoints(Protocol::protocol() | undefined, String::string()) -> [endpoint()]
See also: kpro_lib:parse_endpoints/2.
provide_compression(Modules::[{snappy | lz4, module()}]) -> ok
Set snappy or lz4 compression modules.
This should override the default usage of snappyer
and lz4b_frame
.
request_async(ConnectionPid::connection(), Request::req()) -> ok | {error, any()}
Send a request without waiting for reply.
Reply will be delivered to caller in the future when response message is
received from kafka.
The message to expect should have spec {msg, connection(), #kpro_rsp{}}
where #kpro_rsp.ref
matches the sent Request#kpro_req.ref
.
When it is a produce request with required_acks=0
, there will be no reply.
request_sync(ConnectionPid::pid(), Request::req(), Timeout::timeout()) -> ok | {ok, rsp()} | {error, any()}
Send a request, wait for response.
Immediately return 'ok' if it is a produce request with required_acks=0
.
send(ConnectionPid::connection(), Request::req()) -> ok
Same as @link request_async/2. Only that the message towards connection process is a cast (not a call). Always return 'ok'.
txn_abort(TxnCtx::txn_ctx()) -> ok | {errory, any()}
Abort transaction.
txn_abort(TxnCtx::txn_ctx(), Opts::#{timeout => timeout()}) -> ok | {error, any()}
Abort transaction.
txn_commit(TxnCtx::txn_ctx()) -> ok | {error, any()}
Commit transaction.
txn_commit(TxnCtx::txn_ctx(), Opts::#{timeout => timeout()}) -> ok | {error, any()}
Commit transaction.
txn_init_ctx(Connection::connection(), TxnId::transactional_id()) -> {ok, txn_ctx()} | {error, any()}
Initialize a transaction context, the connection should be established
towards transactional coordinator broker.
By default the request timeout timeout
is 5 seconds. This is for client
to abort waiting for response and consider it an error {error, timeout}
.
Transaction timeout txn_timeout
is -1
by default, which means use kafka
broker setting. The default timeout in kafka broker is 1 minute.
txn_timeout
is for kafka transaction coordinator to abort transaction if
a transaction did not end (commit or abort) in time.
txn_init_ctx(Connection::connection(), TxnId::transactional_id(), Opts::#{timeout => timeout(), txn_timeout => pos_integer()}) -> {ok, txn_ctx()} | {error, any()}
Initialize a transaction context, the connection should be established towards transactional coordinator broker.
txn_offset_commit(GrpConnection::connection(), GrpId::group_id(), TxnCtx::txn_ctx(), Offsets::offsets_to_commit()) -> ok | {error, any()}
see txn_offset_commit/5
txn_offset_commit(GrpConnection::connection(), GrpId::group_id(), TxnCtx::txn_ctx(), Offsets::offsets_to_commit(), Opts::#{timeout => timeout(), user_data => binary()}) -> ok | {error, any()}
Send transactional offset commit request to group coordinator.
user_data
in Opts
is used as default user-data in offset commit request
if there is no user-data associated with offset in offsets_to_commit()
txn_send_cg(TxnCtx, CgId) -> any()
See also: txn_send_cg/3.
txn_send_cg(TxCtx::txn_ctx(), CgId::group_id(), Opts::#{timeout => timeout()}) -> ok | {error, any()}
Add consumed offsets to transaction. This is done by sending the consumer group ID to transaction coordinator.
txn_send_partitions(TxnCtx::txn_ctx(), TPL::[{topic(), partition()}]) -> ok | {error, any()}
See also: txn_send_partitions/3.
txn_send_partitions(TxnCtx::txn_ctx(), TPL::[{topic(), partition()}], Opts::#{timeout => timeout()}) -> ok | {error, any()}
Add partitions to transaction.
Generated by EDoc