View Source kpro (kafka_protocol v4.1.10)
Summary
Types
offset or offset + associated user-data to commit
caller defined decoder
optional args to make produce request
encode / decode stack
Functions
Stop connection process.
Connect to the given endpoint. NOTE: Connection process is linked to caller unless
nolink => true
is set in connection configConnect to any of the endpoints in the given list. NOTE: Connection process is linked to caller unless
nolink => true
is set in connection configConnect to the controller broker of the kafka cluster.
Connect to group or transaction coordinator. If the first arg is not a connection pid but a list of bootstrapping endpoints, it will first try to connect to any of the nodes NOTE: 'txn' type only applicable to kafka 0.11 or later
Connect partition leader. If the fist arg is not an already established metadata connection but a bootstrapping endpoint list, this function will first try to establish a temp connection to any of the bootstrapping endpoints. Then send metadata request to discover partition leader broker Finally connect to the leader broker. NOTE: Connection process is linked to caller unless
nolink => true
is set in connection config.Connect partition leader.
The message-set is not decoded upon receiving (in connection process). It is passed as binary to the consumer process and decoded there. Return
?incomplete_batch(ExpectedSize)
if the fetch size is not big enough for even one single message. Otherwise return [{Meta, Messages}]
where Meta
is either ?KPRO_NO_BATCH_META
for magic-version 0-1 or kpro:batch_meta()
for magic-version 2 or above.Discover group or transactional coordinator. An implicit step performed in
connect_coordinator
. This is useful when the caller wants to re-use already established towards the discovered endpoint.Discover partition leader broker endpoint. An implicit step performed in
connect_partition_leader
. This is useful when the caller wants to re-use already established towards the discovered endpoint.Encode message batch for produce request.
Encode request to byte stream.
See also: kpro_lib:find/2.
See also: kpro_lib:find/3.
Qury API versions using the given
kpro_connection
pid.Get version range for the given API.
Help function to make a request. See also kpro_req_lib for more help functions.
Set snappy or lz4 compression modules. This should override the default usage of
snappyer
and lz4b_frame
.Send a request without waiting for reply. Reply will be delivered to caller in the future when response message is received from kafka. The message to expect should have spec
{msg, connection(), #kpro_rsp{}}
where #kpro_rsp.ref
matches the sent Request#kpro_req.ref
. When it is a produce request with required_acks=0
, there will be no reply.Send a request, wait for response. Immediately return 'ok' if it is a produce request with
required_acks=0
.Same as @link request_async/2. Only that the message towards connection process is a cast (not a call). Always return 'ok'.
Abort transaction.
Abort transaction.
Commit transaction.
Commit transaction.
Initialize a transaction context, the connection should be established towards transactional coordinator broker. By default the request timeout
timeout
is 5 seconds. This is for client to abort waiting for response and consider it an error {error, timeout}
. Transaction timeout txn_timeout
is -1
by default, which means use kafka broker setting. The default timeout in kafka broker is 1 minute. txn_timeout
is for kafka transaction coordinator to abort transaction if a transaction did not end (commit or abort) in time.Initialize a transaction context, the connection should be established towards transactional coordinator broker.
see txn_offset_commit/5
Send transactional offset commit request to group coordinator.
user_data
in Opts
is used as default user-data in offset commit request if there is no user-data associated with offset in offsets_to_commit()
See also: txn_send_cg/3.
Add consumed offsets to transaction. This is done by sending the consumer group ID to transaction coordinator.
Add partitions to transaction.
Types
-type api() :: atom().
-type batch() :: {batch_meta(), [message()]}.
-type batch_input() :: [msg_input()].
-type batch_meta() :: undefined | #{batch_meta_key() => batch_meta_val()}.
-type batch_meta_key() :: is_transaction | is_control | last_offset | max_ts | producer_id.
-type bytes() :: binary().
-type client_id() :: binary().
-type compress_option() :: no_compression | gzip | snappy | lz4.
-type conn_config() :: kpro_connection:config().
-type connection() :: kpro_connection:connection().
-type coordinator_type() :: group | txn.
-type corr_id() :: int32().
-type count() :: non_neg_integer().
-type decode_fun() :: fun((binary()) -> {field_value(), binary()}).
-type fetch_opts() :: kpro_req_lib:fetch_opts().
-type field_tag() :: integer().
-type group_id() :: binary().
-type header_key() :: binary().
-type header_val() :: binary().
-type headers() :: [{header_key(), header_val()}].
-type hostname() :: binary() | inet:hostname() | inet:ip_address().
-type incomplete_batch() :: {incomplete_batch, int32()}.
-type int8() :: -128..127.
-type int16() :: -32768..32767.
-type int32() :: -2147483648..2147483647.
-type int64() :: -9223372036854775808..9223372036854775807.
-type isolation_level() :: read_committed | read_uncommitted.
-type key() :: binary().
-type leader_epoch() :: int32().
-type magic() :: 0..2.
-type message() :: #kafka_message{offset :: kpro:offset(), key :: kpro:bytes(), value :: kpro:bytes(), ts_type :: undefined | kpro:timestamp_type(), ts :: undefined | kpro:int64(), headers :: kpro:headers()}.
-type msg_ts() :: int64().
-type offset() :: int64().
-type partition() :: int32().
-type portnum() :: non_neg_integer().
-type primitive_type() ::
boolean | int8 | int16 | int32 | int64 | varint | string | nullable_string | bytes | records |
unsigned_varint.
-type produce_opts() :: #{compression => compress_option(), required_acks => required_acks(), ack_timeout => wait(), txn_ctx => txn_ctx(), first_sequence => seqno()}.
-type producer_epoch() :: int16().
-type producer_id() :: int64().
-type protocol() :: plaintext | ssl | sasl_plaintext | sasl_ssl.
-type required_acks() :: -1..1 | all_isr | none | leader_only.
-type rsp() :: #kpro_rsp{ref :: false | reference(), api :: kpro:api(), vsn :: kpro:vsn(), msg :: binary() | kpro:struct()}.
-type schema() :: primitive_type() | struct_schema() | {array, schema()} | decode_fun().
-type seqno() :: integer().
-type stack() :: [{api(), vsn()} | field_name()].
-type struct() :: #{field_name() => field_value()} | [{field_name(), field_value()}].
-type struct_schema() :: [{field_name(), schema()}].
-type timestamp_type() :: undefined | create | append.
-type topic() :: binary().
-type transactional_id() :: binary().
-type txn_ctx() :: #{connection => connection(), transactional_id => transactional_id(), producer_id => producer_id(), producer_epoch => producer_id()}.
-type value() :: binary().
-type vsn() :: non_neg_integer().
-type wait() :: non_neg_integer().
Functions
-spec close_connection(connection()) -> ok.
-spec connect(endpoint(), conn_config()) -> {ok, connection()} | {error, any()}.
nolink => true
is set in connection config
-spec connect_any([endpoint()], conn_config()) -> {ok, connection()} | {error, any()}.
nolink => true
is set in connection config
-spec connect_controller(connection() | [endpoint()], conn_config()) -> {ok, connection()} | {error, any()}.
See also: connect_controller/3.
-spec connect_controller(connection() | [endpoint()], conn_config(), #{timeout => timeout()}) -> {ok, connection()} | {error, any()}.
-spec connect_coordinator(connection() | [endpoint()], conn_config(), #{type => coordinator_type(), id => binary(), timeout => timeout()}) -> {ok, connection()} | {error, any()}.
-spec connect_partition_leader(connection() | [endpoint()], conn_config(), topic(), partition()) -> {ok, connection()} | {error, any()}.
nolink => true
is set in connection config.
Link to this function
connect_partition_leader(Bootstrap, ConnConfig, Topic, Partition, Opts)
View Source-spec connect_partition_leader(connection() | [endpoint()], conn_config(), topic(), partition(), #{timeout => timeout()}) -> {ok, connection()} | {error, any()}.
-spec decode_batches(binary()) -> batch_decode_result().
?incomplete_batch(ExpectedSize)
if the fetch size is not big enough for even one single message. Otherwise return [{Meta, Messages}]
where Meta
is either ?KPRO_NO_BATCH_META
for magic-version 0-1 or kpro:batch_meta()
for magic-version 2 or above.
-spec discover_coordinator(connection(), coordinator_type(), group_id() | transactional_id(), timeout()) -> {ok, endpoint()} | {error, any()}.
connect_coordinator
. This is useful when the caller wants to re-use already established towards the discovered endpoint.
-spec discover_partition_leader(connection(), topic(), partition(), timeout()) -> {ok, endpoint()} | {error, any()}.
connect_partition_leader
. This is useful when the caller wants to re-use already established towards the discovered endpoint.
-spec encode_batch(magic(), batch_input(), compress_option()) -> binary().
-spec find(field_name(), struct()) -> field_value().
See also: kpro_lib:find/2.
-spec find(field_name(), struct(), field_value()) -> field_value().
See also: kpro_lib:find/3.
-spec get_api_versions(connection()) -> {ok, vsn_ranges()} | {error, any()}.
kpro_connection
pid.
-spec get_api_vsn_range(connection(), api()) -> {ok, vsn_range()} | {error, any()}.
See also: kpro_lib:parse_endpoints/2.
See also: kpro_lib:parse_endpoints/2.
-spec provide_compression([{snappy | lz4, module()}]) -> ok.
snappyer
and lz4b_frame
.
-spec request_async(connection(), req()) -> ok | {error, any()}.
{msg, connection(), #kpro_rsp{}}
where #kpro_rsp.ref
matches the sent Request#kpro_req.ref
. When it is a produce request with required_acks=0
, there will be no reply.
required_acks=0
.
-spec send(connection(), req()) -> ok.
-spec txn_init_ctx(connection(), transactional_id()) -> {ok, txn_ctx()} | {error, any()}.
timeout
is 5 seconds. This is for client to abort waiting for response and consider it an error {error, timeout}
. Transaction timeout txn_timeout
is -1
by default, which means use kafka broker setting. The default timeout in kafka broker is 1 minute. txn_timeout
is for kafka transaction coordinator to abort transaction if a transaction did not end (commit or abort) in time.
-spec txn_init_ctx(connection(), transactional_id(), #{timeout => timeout(), txn_timeout => pos_integer()}) -> {ok, txn_ctx()} | {error, any()}.
-spec txn_offset_commit(connection(), group_id(), txn_ctx(), offsets_to_commit()) -> ok | {error, any()}.
-spec txn_offset_commit(connection(), group_id(), txn_ctx(), offsets_to_commit(), #{timeout => timeout(), user_data => binary()}) -> ok | {error, any()}.
user_data
in Opts
is used as default user-data in offset commit request if there is no user-data associated with offset in offsets_to_commit()
See also: txn_send_cg/3.
See also: txn_send_partitions/3.