Module kpro

Data Types

api()

api() = atom()

batch()

batch() = {batch_meta(), [message()]}

batch_decode_result()

batch_decode_result() = {incomplete_batch, int32()} | [batch()]

offset or offset + associated user-data to commit

batch_input()

batch_input() = [msg_input()]

batch_meta()

batch_meta() = undefined | #{batch_meta_key() => batch_meta_val()}

batch_meta_key()

batch_meta_key() = is_transaction | is_control | last_offset | max_ts | producer_id

batch_meta_val()

batch_meta_val() = boolean() | atom() | integer()

bytes()

bytes() = binary()

client_id()

client_id() = binary()

compress_option()

compress_option() = no_compression | gzip | snappy | lz4

conn_config()

conn_config() = kpro_connection:config()

connection()

connection() = kpro_connection:connection()

coordinator_type()

coordinator_type() = group | txn

corr_id()

corr_id() = int32()

count()

count() = non_neg_integer()

decode_fun()

decode_fun() = fun((binary()) -> {field_value(), binary()})

endpoint()

endpoint() = {hostname(), portnum()}

error_code()

error_code() = int16() | atom()

fetch_opts()

fetch_opts() = kpro_req_lib:fetch_opts()

field_name()

field_name() = atom() | field_tag()

field_tag()

field_tag() = integer()

field_value()

field_value() = primitive() | [primitive()] | struct() | [struct()]

group_id()

group_id() = binary()

header_key()

header_key() = binary()

header_val()

header_val() = binary()

headers()

headers() = [{header_key(), header_val()}]

hostname()

hostname() = binary() | inet:hostname()

incomplete_batch()

incomplete_batch() = {incomplete_batch, int32()}

int16()

int16() = -32768..32767

int32()

int32() = -2147483648..2147483647

int64()

int64() = -9223372036854775808..9223372036854775807

int8()

int8() = -128..127

isolation_level()

isolation_level() = read_committed | read_uncommitted

key()

key() = binary()

leader_epoch()

leader_epoch() = int32()

magic()

magic() = 0..2

message()

message() = #kafka_message{offset = kpro:offset(), key = kpro:bytes(), value = kpro:bytes(), ts_type = undefined | kpro:timestamp_type(), ts = undefined | kpro:int64(), headers = kpro:headers()}

msg_input()

msg_input() = #{headers => headers(), ts => msg_ts(), key => key(), value => value()}

msg_ts()

msg_ts() = int64()

offset()

offset() = int64()

offset_ud()

offset_ud() = offset() | {offset(), binary()}

offsets_to_commit()

offsets_to_commit() = #{{topic(), partition()} => offset_ud()} | [{{topic(), partition()}, offset_ud()}]

partition()

partition() = int32()

portnum()

portnum() = non_neg_integer()

primitive()

primitive() = integer() | string() | binary() | atom()

primitive_type()

primitive_type() = boolean | int8 | int16 | int32 | int64 | varint | string | nullable_string | bytes | records | unsigned_varint

produce_opts()

produce_opts() = #{compression => compress_option(), required_acks => required_acks(), ack_timeout => wait(), txn_ctx => txn_ctx(), first_sequence => seqno()}

producer_epoch()

producer_epoch() = int16()

producer_id()

producer_id() = int64()

protocol()

protocol() = plaintext | ssl | sasl_plaintext | sasl_ssl

req()

req() = #kpro_req{ref = reference(), api = kpro:api(), vsn = kpro:vsn(), no_ack = boolean(), msg = iodata() | kpro:struct()}

required_acks()

required_acks() = -1..1 | all_isr | none | leader_only

rsp()

rsp() = #kpro_rsp{ref = false | reference(), api = kpro:api(), vsn = kpro:vsn(), msg = binary() | kpro:struct()}

schema()

schema() = primitive_type() | struct_schema() | {array, schema()} | decode_fun()

caller defined decoder

seqno()

seqno() = integer()

optional args to make produce request

stack()

stack() = [{api(), vsn()} | field_name()]

encode / decode stack

str()

str() = undefined | string() | binary()

struct()

struct() = #{field_name() => field_value()} | [{field_name(), field_value()}]

struct_schema()

struct_schema() = [{field_name(), schema()}]

timestamp_type()

timestamp_type() = undefined | create | append

topic()

topic() = binary()

transactional_id()

transactional_id() = binary()

txn_ctx()

txn_ctx() = #{connection => connection(), transactional_id => transactional_id(), producer_id => producer_id(), producer_epoch => producer_id()}

value()

value() = binary()

vsn()

vsn() = non_neg_integer()

vsn_range()

vsn_range() = {vsn(), vsn()}

vsn_ranges()

vsn_ranges() = #{api() => vsn_range()}

wait()

wait() = non_neg_integer()

Function Index

close_connection/1Sotp connection process.
connect/2Connect to the given endpoint.
connect_any/2Connect to any of the endpoints in the given list.
connect_controller/2
connect_controller/3Connect to the controller broker of the kafka cluster.
connect_coordinator/3Connect to group or transaction coordinator.
connect_partition_leader/4Connect partition leader.
connect_partition_leader/5Connect partition leader.
decode_batches/1The message-set is not decoded upon receiving (in connection process).
discover_coordinator/4Discover group or transactional coordinator.
discover_partition_leader/4Discover partition leader broker endpoint.
encode_batch/3Encode message batch for produce request.
encode_request/3Encode request to byte stream.
find/2
find/3
get_api_versions/1Qury API versions using the given kpro_connection pid.
get_api_vsn_range/2Get version range for the given API.
make_request/3Help function to make a request.
parse_endpoints/1
parse_endpoints/2
produce_api_vsn_to_magic_vsn/1
provide_compression/1Set snappy or lz4 compression modules.
request_async/2Send a request without waiting for reply.
request_sync/3Send a request, wait for response.
send/2Same as @link request_async/2.
txn_abort/1Abort transaction.
txn_abort/2Abort transaction.
txn_commit/1Commit transaction.
txn_commit/2Commit transaction.
txn_init_ctx/2Initialize a transaction context, the connection should be established towards transactional coordinator broker.
txn_init_ctx/3Initialize a transaction context, the connection should be established towards transactional coordinator broker.
txn_offset_commit/4see txn_offset_commit/5.
txn_offset_commit/5Send transactional offset commit request to group coordinator.
txn_send_cg/2
txn_send_cg/3Add consumed offsets to transaction.
txn_send_partitions/2
txn_send_partitions/3Add partitions to transaction.

Function Details

close_connection/1

close_connection(Connection::connection()) -> ok

Sotp connection process.

connect/2

connect(Endpoint::endpoint(), ConnConfig::conn_config()) -> {ok, connection()} | {error, any()}

Connect to the given endpoint. NOTE: Connection process is linked to caller unless nolink => true is set in connection config

connect_any/2

connect_any(Endpoints::[endpoint()], ConnConfig::conn_config()) -> {ok, connection()} | {error, any()}

Connect to any of the endpoints in the given list. NOTE: Connection process is linked to caller unless nolink => true is set in connection config

connect_controller/2

connect_controller(Bootstrap::connection() | [endpoint()], ConnConfig::conn_config()) -> {ok, connection()} | {error, any()}

See also: connect_controller/3.

connect_controller/3

connect_controller(Bootstrap::connection() | [endpoint()], ConnConfig::conn_config(), Opts::#{timeout => timeout()}) -> {ok, connection()} | {error, any()}

Connect to the controller broker of the kafka cluster.

connect_coordinator/3

connect_coordinator(Bootstrap::connection() | [endpoint()], ConnConfig::conn_config(), Args::#{type => coordinator_type(), id => binary(), timeout => timeout()}) -> {ok, connection()} | {error, any()}

Connect to group or transaction coordinator. If the first arg is not a connection pid but a list of bootstrapping endpoints, it will frist try to connect to any of the nodes NOTE: 'txn' type only applicable to kafka 0.11 or later

connect_partition_leader/4

connect_partition_leader(Bootstrap::connection() | [endpoint()], ConnConfig::conn_config(), Topic::topic(), Partition::partition()) -> {ok, connection()} | {error, any()}

Connect partition leader. If the fist arg is not an already established metadata connection but a bootstrapping endpoint list, this function will first try to establish a temp connection to any of the bootstrapping endpoints. Then send metadata request to discover partition leader broker Finally connect to the leader broker. NOTE: Connection process is linked to caller unless nolink => true is set in connection config.

connect_partition_leader/5

connect_partition_leader(Bootstrap::connection() | [endpoint()], ConnConfig::conn_config(), Topic::topic(), Partition::partition(), Opts::#{timeout => timeout()}) -> {ok, connection()} | {error, any()}

Connect partition leader.

decode_batches/1

decode_batches(Bin::binary()) -> batch_decode_result()

The message-set is not decoded upon receiving (in connection process). It is passed as binary to the consumer process and decoded there. Return ?incomplete_batch(ExpectedSize) if the fetch size is not big enough for even one single message. Otherwise return [{Meta, Messages}] where Meta is either ?KPRO_NO_BATCH_META for magic-version 0-1 or kpro:batch_meta() for magic-version 2 or above.

discover_coordinator/4

discover_coordinator(Connection::connection(), Type::coordinator_type(), Id::group_id() | transactional_id(), Timeout::timeout()) -> {ok, endpoint()} | {error, any()}

Discover group or transactional coordinator. An implicit step performed in connect_coordinator. This is useful when the caller wants to re-use already established towards the discovered endpoint.

discover_partition_leader/4

discover_partition_leader(Connection::connection(), Topic::topic(), Partition::partition(), Timeout::timeout()) -> {ok, endpoint()} | {error, any()}

Discover partition leader broker endpoint. An implicit step performed in connect_partition_leader. This is useful when the caller wants to re-use already established towards the discovered endpoint.

encode_batch/3

encode_batch(Magic::magic(), Batch::batch_input(), Compression::compress_option()) -> binary()

Encode message batch for produce request.

encode_request/3

encode_request(ClientId::client_id(), CorrId::corr_id(), Req::req()) -> iodata()

Encode request to byte stream.

find/2

find(Field::field_name(), Struct::struct()) -> field_value()

See also: kpro_lib:find/2.

find/3

find(Field::field_name(), Struct::struct(), Default::field_value()) -> field_value()

See also: kpro_lib:find/3.

get_api_versions/1

get_api_versions(Connection::connection()) -> {ok, vsn_ranges()} | {error, any()}

Qury API versions using the given kpro_connection pid.

get_api_vsn_range/2

get_api_vsn_range(Connection::connection(), API::api()) -> {ok, vsn_range()} | {error, any()}

Get version range for the given API.

make_request/3

make_request(Api::api(), Vsn::vsn(), Fields::struct()) -> req()

Help function to make a request. See also kpro_req_lib for more help functions.

parse_endpoints/1

parse_endpoints(String) -> any()

See also: kpro_lib:parse_endpoints/2.

parse_endpoints/2

parse_endpoints(Protocol::protocol() | undefined, String::string()) -> [endpoint()]

See also: kpro_lib:parse_endpoints/2.

produce_api_vsn_to_magic_vsn/1

produce_api_vsn_to_magic_vsn(V::vsn()) -> magic()

provide_compression/1

provide_compression(Modules::[{snappy | lz4, module()}]) -> ok

Set snappy or lz4 compression modules. This should override the default usage of snappyer and lz4b_frame.

request_async/2

request_async(ConnectionPid::connection(), Request::req()) -> ok | {error, any()}

Send a request without waiting for reply. Reply will be delivered to caller in the future when response message is received from kafka. The message to expect should have spec {msg, connection(), #kpro_rsp{}} where #kpro_rsp.ref matches the sent Request#kpro_req.ref. When it is a produce request with required_acks=0, there will be no reply.

request_sync/3

request_sync(ConnectionPid::pid(), Request::req(), Timeout::timeout()) -> ok | {ok, rsp()} | {error, any()}

Send a request, wait for response. Immediately return 'ok' if it is a produce request with required_acks=0.

send/2

send(ConnectionPid::connection(), Request::req()) -> ok

Same as @link request_async/2. Only that the message towards connection process is a cast (not a call). Always return 'ok'.

txn_abort/1

txn_abort(TxnCtx::txn_ctx()) -> ok | {errory, any()}

Abort transaction.

txn_abort/2

txn_abort(TxnCtx::txn_ctx(), Opts::#{timeout => timeout()}) -> ok | {error, any()}

Abort transaction.

txn_commit/1

txn_commit(TxnCtx::txn_ctx()) -> ok | {error, any()}

Commit transaction.

txn_commit/2

txn_commit(TxnCtx::txn_ctx(), Opts::#{timeout => timeout()}) -> ok | {error, any()}

Commit transaction.

txn_init_ctx/2

txn_init_ctx(Connection::connection(), TxnId::transactional_id()) -> {ok, txn_ctx()} | {error, any()}

Initialize a transaction context, the connection should be established towards transactional coordinator broker. By default the request timeout timeout is 5 seconds. This is for client to abort waiting for response and consider it an error {error, timeout}. Transaction timeout txn_timeout is -1 by default, which means use kafka broker setting. The default timeout in kafka broker is 1 minute. txn_timeout is for kafka transaction coordinator to abort transaction if a transaction did not end (commit or abort) in time.

txn_init_ctx/3

txn_init_ctx(Connection::connection(), TxnId::transactional_id(), Opts::#{timeout => timeout(), txn_timeout => pos_integer()}) -> {ok, txn_ctx()} | {error, any()}

Initialize a transaction context, the connection should be established towards transactional coordinator broker.

txn_offset_commit/4

txn_offset_commit(GrpConnection::connection(), GrpId::group_id(), TxnCtx::txn_ctx(), Offsets::offsets_to_commit()) -> ok | {error, any()}

see txn_offset_commit/5

txn_offset_commit/5

txn_offset_commit(GrpConnection::connection(), GrpId::group_id(), TxnCtx::txn_ctx(), Offsets::offsets_to_commit(), Opts::#{timeout => timeout(), user_data => binary()}) -> ok | {error, any()}

Send transactional offset commit request to group coordinator. user_data in Opts is used as default user-data in offset commit request if there is no user-data associated with offset in offsets_to_commit()

txn_send_cg/2

txn_send_cg(TxnCtx, CgId) -> any()

See also: txn_send_cg/3.

txn_send_cg/3

txn_send_cg(TxCtx::txn_ctx(), CgId::group_id(), Opts::#{timeout => timeout()}) -> ok | {error, any()}

Add consumed offsets to transaction. This is done by sending the consumer group ID to transaction coordinator.

txn_send_partitions/2

txn_send_partitions(TxnCtx::txn_ctx(), TPL::[{topic(), partition()}]) -> ok | {error, any()}

See also: txn_send_partitions/3.

txn_send_partitions/3

txn_send_partitions(TxnCtx::txn_ctx(), TPL::[{topic(), partition()}], Opts::#{timeout => timeout()}) -> ok | {error, any()}

Add partitions to transaction.


Generated by EDoc