View Source kpro (kafka_protocol v4.1.10)

Summary

Functions

Stop connection process.
Connect to the given endpoint. NOTE: Connection process is linked to caller unless nolink => true is set in connection config
Connect to any of the endpoints in the given list. NOTE: Connection process is linked to caller unless nolink => true is set in connection config
Connect to the controller broker of the kafka cluster.
Connect to group or transaction coordinator. If the first arg is not a connection pid but a list of bootstrapping endpoints, it will first try to connect to any of the nodes NOTE: 'txn' type only applicable to kafka 0.11 or later
Connect partition leader. If the fist arg is not an already established metadata connection but a bootstrapping endpoint list, this function will first try to establish a temp connection to any of the bootstrapping endpoints. Then send metadata request to discover partition leader broker Finally connect to the leader broker. NOTE: Connection process is linked to caller unless nolink => true is set in connection config.
The message-set is not decoded upon receiving (in connection process). It is passed as binary to the consumer process and decoded there. Return ?incomplete_batch(ExpectedSize) if the fetch size is not big enough for even one single message. Otherwise return [{Meta, Messages}] where Meta is either ?KPRO_NO_BATCH_META for magic-version 0-1 or kpro:batch_meta() for magic-version 2 or above.
Discover group or transactional coordinator. An implicit step performed in connect_coordinator. This is useful when the caller wants to re-use already established towards the discovered endpoint.
Discover partition leader broker endpoint. An implicit step performed in connect_partition_leader. This is useful when the caller wants to re-use already established towards the discovered endpoint.
Encode message batch for produce request.
Encode request to byte stream.
Qury API versions using the given kpro_connection pid.
Get version range for the given API.
Help function to make a request. See also kpro_req_lib for more help functions.
Set snappy or lz4 compression modules. This should override the default usage of snappyer and lz4b_frame.
Send a request without waiting for reply. Reply will be delivered to caller in the future when response message is received from kafka. The message to expect should have spec {msg, connection(), #kpro_rsp{}} where #kpro_rsp.ref matches the sent Request#kpro_req.ref. When it is a produce request with required_acks=0, there will be no reply.
Send a request, wait for response. Immediately return 'ok' if it is a produce request with required_acks=0.
Same as @link request_async/2. Only that the message towards connection process is a cast (not a call). Always return 'ok'.
Abort transaction.
Abort transaction.
Commit transaction.
Commit transaction.
Initialize a transaction context, the connection should be established towards transactional coordinator broker. By default the request timeout timeout is 5 seconds. This is for client to abort waiting for response and consider it an error {error, timeout}. Transaction timeout txn_timeout is -1 by default, which means use kafka broker setting. The default timeout in kafka broker is 1 minute. txn_timeout is for kafka transaction coordinator to abort transaction if a transaction did not end (commit or abort) in time.
Initialize a transaction context, the connection should be established towards transactional coordinator broker.
Send transactional offset commit request to group coordinator. user_data in Opts is used as default user-data in offset commit request if there is no user-data associated with offset in offsets_to_commit()
Add consumed offsets to transaction. This is done by sending the consumer group ID to transaction coordinator.
Add partitions to transaction.

Types

-type api() :: atom().
-type batch() :: {batch_meta(), [message()]}.
-type batch_decode_result() :: {incomplete_batch, int32()} | [batch()].
offset or offset + associated user-data to commit
-type batch_input() :: [msg_input()].
-type batch_meta() :: undefined | #{batch_meta_key() => batch_meta_val()}.
-type batch_meta_key() :: is_transaction | is_control | last_offset | max_ts | producer_id.
-type batch_meta_val() :: boolean() | atom() | integer().
-type bytes() :: binary().
-type client_id() :: binary().
-type compress_option() :: no_compression | gzip | snappy | lz4.
-type conn_config() :: kpro_connection:config().
-type connection() :: kpro_connection:connection().
-type coordinator_type() :: group | txn.
-type corr_id() :: int32().
-type count() :: non_neg_integer().
-type decode_fun() :: fun((binary()) -> {field_value(), binary()}).
-type endpoint() :: {hostname(), portnum()}.
-type error_code() :: int16() | atom().
-type fetch_opts() :: kpro_req_lib:fetch_opts().
-type field_name() :: atom() | field_tag().
-type field_tag() :: integer().
-type field_value() :: primitive() | [primitive()] | struct() | [struct()].
-type group_id() :: binary().
-type header_key() :: binary().
-type header_val() :: binary().
-type headers() :: [{header_key(), header_val()}].
-type hostname() :: binary() | inet:hostname() | inet:ip_address().
-type incomplete_batch() :: {incomplete_batch, int32()}.
-type int8() :: -128..127.
-type int16() :: -32768..32767.
-type int32() :: -2147483648..2147483647.
-type int64() :: -9223372036854775808..9223372036854775807.
-type isolation_level() :: read_committed | read_uncommitted.
-type key() :: binary().
-type leader_epoch() :: int32().
-type magic() :: 0..2.
-type message() ::
          #kafka_message{offset :: kpro:offset(),
                         key :: kpro:bytes(),
                         value :: kpro:bytes(),
                         ts_type :: undefined | kpro:timestamp_type(),
                         ts :: undefined | kpro:int64(),
                         headers :: kpro:headers()}.
-type msg_input() :: #{headers => headers(), ts => msg_ts(), key => key(), value => value()}.
-type msg_ts() :: int64().
-type offset() :: int64().
-type offset_ud() :: offset() | {offset(), binary()}.
-type offsets_to_commit() ::
          #{{topic(), partition()} => offset_ud()} | [{{topic(), partition()}, offset_ud()}].
-type partition() :: int32().
-type portnum() :: non_neg_integer().
-type primitive() :: integer() | string() | binary() | atom().
-type primitive_type() ::
          boolean | int8 | int16 | int32 | int64 | varint | string | nullable_string | bytes | records |
          unsigned_varint.
-type produce_opts() ::
          #{compression => compress_option(),
            required_acks => required_acks(),
            ack_timeout => wait(),
            txn_ctx => txn_ctx(),
            first_sequence => seqno()}.
-type producer_epoch() :: int16().
-type producer_id() :: int64().
-type protocol() :: plaintext | ssl | sasl_plaintext | sasl_ssl.
-type req() ::
          #kpro_req{ref :: reference(),
                    api :: kpro:api(),
                    vsn :: kpro:vsn(),
                    no_ack :: boolean(),
                    msg :: iodata() | kpro:struct()}.
-type required_acks() :: -1..1 | all_isr | none | leader_only.
-type rsp() ::
          #kpro_rsp{ref :: false | reference(),
                    api :: kpro:api(),
                    vsn :: kpro:vsn(),
                    msg :: binary() | kpro:struct()}.
-type schema() :: primitive_type() | struct_schema() | {array, schema()} | decode_fun().
caller defined decoder
-type seqno() :: integer().
optional args to make produce request
-type stack() :: [{api(), vsn()} | field_name()].
encode / decode stack
-type str() :: undefined | string() | binary().
-type struct() :: #{field_name() => field_value()} | [{field_name(), field_value()}].
-type struct_schema() :: [{field_name(), schema()}].
-type timestamp_type() :: undefined | create | append.
-type topic() :: binary().
-type transactional_id() :: binary().
-type txn_ctx() ::
          #{connection => connection(),
            transactional_id => transactional_id(),
            producer_id => producer_id(),
            producer_epoch => producer_id()}.
-type value() :: binary().
-type vsn() :: non_neg_integer().
-type vsn_range() :: {vsn(), vsn()}.
-type vsn_ranges() :: #{api() => vsn_range()}.
-type wait() :: non_neg_integer().

Functions

Link to this function

close_connection(Connection)

View Source
-spec close_connection(connection()) -> ok.
Stop connection process.
Link to this function

connect(Endpoint, ConnConfig)

View Source
-spec connect(endpoint(), conn_config()) -> {ok, connection()} | {error, any()}.
Connect to the given endpoint. NOTE: Connection process is linked to caller unless nolink => true is set in connection config
Link to this function

connect_any(Endpoints, ConnConfig)

View Source
-spec connect_any([endpoint()], conn_config()) -> {ok, connection()} | {error, any()}.
Connect to any of the endpoints in the given list. NOTE: Connection process is linked to caller unless nolink => true is set in connection config
Link to this function

connect_controller(Bootstrap, ConnConfig)

View Source
-spec connect_controller(connection() | [endpoint()], conn_config()) ->
                            {ok, connection()} | {error, any()}.

See also: connect_controller/3.

Link to this function

connect_controller(Bootstrap, ConnConfig, Opts)

View Source
-spec connect_controller(connection() | [endpoint()], conn_config(), #{timeout => timeout()}) ->
                            {ok, connection()} | {error, any()}.
Connect to the controller broker of the kafka cluster.
Link to this function

connect_coordinator(Bootstrap, ConnConfig, Args)

View Source
-spec connect_coordinator(connection() | [endpoint()],
                          conn_config(),
                          #{type => coordinator_type(), id => binary(), timeout => timeout()}) ->
                             {ok, connection()} | {error, any()}.
Connect to group or transaction coordinator. If the first arg is not a connection pid but a list of bootstrapping endpoints, it will first try to connect to any of the nodes NOTE: 'txn' type only applicable to kafka 0.11 or later
Link to this function

connect_partition_leader(Bootstrap, ConnConfig, Topic, Partition)

View Source
-spec connect_partition_leader(connection() | [endpoint()], conn_config(), topic(), partition()) ->
                                  {ok, connection()} | {error, any()}.
Connect partition leader. If the fist arg is not an already established metadata connection but a bootstrapping endpoint list, this function will first try to establish a temp connection to any of the bootstrapping endpoints. Then send metadata request to discover partition leader broker Finally connect to the leader broker. NOTE: Connection process is linked to caller unless nolink => true is set in connection config.
Link to this function

connect_partition_leader(Bootstrap, ConnConfig, Topic, Partition, Opts)

View Source
-spec connect_partition_leader(connection() | [endpoint()],
                               conn_config(),
                               topic(),
                               partition(),
                               #{timeout => timeout()}) ->
                                  {ok, connection()} | {error, any()}.
Connect partition leader.
-spec decode_batches(binary()) -> batch_decode_result().
The message-set is not decoded upon receiving (in connection process). It is passed as binary to the consumer process and decoded there. Return ?incomplete_batch(ExpectedSize) if the fetch size is not big enough for even one single message. Otherwise return [{Meta, Messages}] where Meta is either ?KPRO_NO_BATCH_META for magic-version 0-1 or kpro:batch_meta() for magic-version 2 or above.
Link to this function

discover_coordinator(Connection, Type, Id, Timeout)

View Source
-spec discover_coordinator(connection(), coordinator_type(), group_id() | transactional_id(), timeout()) ->
                              {ok, endpoint()} | {error, any()}.
Discover group or transactional coordinator. An implicit step performed in connect_coordinator. This is useful when the caller wants to re-use already established towards the discovered endpoint.
Link to this function

discover_partition_leader(Connection, Topic, Partition, Timeout)

View Source
-spec discover_partition_leader(connection(), topic(), partition(), timeout()) ->
                                   {ok, endpoint()} | {error, any()}.
Discover partition leader broker endpoint. An implicit step performed in connect_partition_leader. This is useful when the caller wants to re-use already established towards the discovered endpoint.
Link to this function

encode_batch(Magic, Batch, Compression)

View Source
-spec encode_batch(magic(), batch_input(), compress_option()) -> binary().
Encode message batch for produce request.
Link to this function

encode_request(ClientId, CorrId, Req)

View Source
-spec encode_request(client_id(), corr_id(), req()) -> iodata().
Encode request to byte stream.
-spec find(field_name(), struct()) -> field_value().

See also: kpro_lib:find/2.

Link to this function

find(Field, Struct, Default)

View Source
-spec find(field_name(), struct(), field_value()) -> field_value().

See also: kpro_lib:find/3.

Link to this function

get_api_versions(Connection)

View Source
-spec get_api_versions(connection()) -> {ok, vsn_ranges()} | {error, any()}.
Qury API versions using the given kpro_connection pid.
Link to this function

get_api_vsn_range(Connection, API)

View Source
-spec get_api_vsn_range(connection(), api()) -> {ok, vsn_range()} | {error, any()}.
Get version range for the given API.
Link to this function

make_request(Api, Vsn, Fields)

View Source
-spec make_request(api(), vsn(), struct()) -> req().
Help function to make a request. See also kpro_req_lib for more help functions.

See also: kpro_lib:parse_endpoints/2.

Link to this function

parse_endpoints(Protocol, String)

View Source
-spec parse_endpoints(protocol() | undefined, string()) -> [endpoint()].

See also: kpro_lib:parse_endpoints/2.

Link to this function

produce_api_vsn_to_magic_vsn(V)

View Source
-spec produce_api_vsn_to_magic_vsn(vsn()) -> magic().
Link to this function

provide_compression(Modules)

View Source
-spec provide_compression([{snappy | lz4, module()}]) -> ok.
Set snappy or lz4 compression modules. This should override the default usage of snappyer and lz4b_frame.
Link to this function

request_async(ConnectionPid, Request)

View Source
-spec request_async(connection(), req()) -> ok | {error, any()}.
Send a request without waiting for reply. Reply will be delivered to caller in the future when response message is received from kafka. The message to expect should have spec {msg, connection(), #kpro_rsp{}} where #kpro_rsp.ref matches the sent Request#kpro_req.ref. When it is a produce request with required_acks=0, there will be no reply.
Link to this function

request_sync(ConnectionPid, Request, Timeout)

View Source
-spec request_sync(pid(), req(), timeout()) -> ok | {ok, rsp()} | {error, any()}.
Send a request, wait for response. Immediately return 'ok' if it is a produce request with required_acks=0.
Link to this function

send(ConnectionPid, Request)

View Source
-spec send(connection(), req()) -> ok.
Same as @link request_async/2. Only that the message towards connection process is a cast (not a call). Always return 'ok'.
-spec txn_abort(txn_ctx()) -> ok | {errory, any()}.
Abort transaction.
-spec txn_abort(txn_ctx(), #{timeout => timeout()}) -> ok | {error, any()}.
Abort transaction.
-spec txn_commit(txn_ctx()) -> ok | {error, any()}.
Commit transaction.
Link to this function

txn_commit(TxnCtx, Opts)

View Source
-spec txn_commit(txn_ctx(), #{timeout => timeout()}) -> ok | {error, any()}.
Commit transaction.
Link to this function

txn_init_ctx(Connection, TxnId)

View Source
-spec txn_init_ctx(connection(), transactional_id()) -> {ok, txn_ctx()} | {error, any()}.
Initialize a transaction context, the connection should be established towards transactional coordinator broker. By default the request timeout timeout is 5 seconds. This is for client to abort waiting for response and consider it an error {error, timeout}. Transaction timeout txn_timeout is -1 by default, which means use kafka broker setting. The default timeout in kafka broker is 1 minute. txn_timeout is for kafka transaction coordinator to abort transaction if a transaction did not end (commit or abort) in time.
Link to this function

txn_init_ctx(Connection, TxnId, Opts)

View Source
-spec txn_init_ctx(connection(),
                   transactional_id(),
                   #{timeout => timeout(), txn_timeout => pos_integer()}) ->
                      {ok, txn_ctx()} | {error, any()}.
Initialize a transaction context, the connection should be established towards transactional coordinator broker.
Link to this function

txn_offset_commit(GrpConnection, GrpId, TxnCtx, Offsets)

View Source
-spec txn_offset_commit(connection(), group_id(), txn_ctx(), offsets_to_commit()) -> ok | {error, any()}.
see txn_offset_commit/5
Link to this function

txn_offset_commit(GrpConnection, GrpId, TxnCtx, Offsets, Opts)

View Source
-spec txn_offset_commit(connection(),
                        group_id(),
                        txn_ctx(),
                        offsets_to_commit(),
                        #{timeout => timeout(), user_data => binary()}) ->
                           ok | {error, any()}.
Send transactional offset commit request to group coordinator. user_data in Opts is used as default user-data in offset commit request if there is no user-data associated with offset in offsets_to_commit()
Link to this function

txn_send_cg(TxnCtx, CgId)

View Source

See also: txn_send_cg/3.

Link to this function

txn_send_cg(TxCtx, CgId, Opts)

View Source
-spec txn_send_cg(txn_ctx(), group_id(), #{timeout => timeout()}) -> ok | {error, any()}.
Add consumed offsets to transaction. This is done by sending the consumer group ID to transaction coordinator.
Link to this function

txn_send_partitions(TxnCtx, TPL)

View Source
-spec txn_send_partitions(txn_ctx(), [{topic(), partition()}]) -> ok | {error, any()}.

See also: txn_send_partitions/3.

Link to this function

txn_send_partitions(TxnCtx, TPL, Opts)

View Source
-spec txn_send_partitions(txn_ctx(), [{topic(), partition()}], #{timeout => timeout()}) ->
                             ok | {error, any()}.
Add partitions to transaction.