View Source kpro_connection (kafka_protocol v4.1.10)

Summary

Functions

Return all config keys make client config management easy.
Enable/disable debugging on the socket process. debug(Pid, pring) prints debug info on stdout debug(Pid, File) prints debug info into a File debug(Pid, none) stops debugging
Send a request. Caller should expect to receive a response having Rsp#kpro_rsp.ref the same as Request#kpro_req.ref unless Request#kpro_req.no_ack is set to 'true'
Send a request and wait for response for at most Timeout milliseconds.
Same as @link request_async/2. Only that the message towards connection process is a cast (not a call), unless the request requires no ack from Kafka, in which case call is used to avoid message overflow. It always return 'ok'.
Connect to the given endpoint. The started connection pid is linked to caller unless nolink := true is found in Config
Stop socket process.

Types

-type cfg_key() ::
          connect_timeout | client_id | extra_sock_opts | debug | nolink | query_api_versions |
          request_timeout | sasl | ssl.
-type cfg_val() :: term().
-type client_id() :: kpro:client_id().
-type config() :: [{cfg_key(), cfg_val()}] | #{cfg_key() => cfg_val()}.

Connection configuration.

It is a tuple list or map with following keys (all of them are optional):
  • connection_timeout: timeout (in ms) for the initial connection, defaults to 5 seconds
  • client_id: string representing the client in Kafka, defaults to "kpro-client"
  • extra_sock_opts: extra options passed down to gen_tpc, defaults to []
  • debug: debugging mode, defaults to false
  • nolink: whether not to link the kpro_connection process to the caller, defaults to false
  • query_api_version: whether to query Kafka for supported API versions at the beginning, so that kpro can use newer APIs; the ApiVersionRequest was introduced in Kafka 0.10, so set this to false when using an older version of Kafka; defaults to true
  • request_timeout: timeout (in ms) for the actual request, defaults to 4 minutes
  • sasl: configuration of SASL authentication, can be either {Mechanism, Username, Password} or {Mechanism, File} or undefined, where Mechanism is plain | scram_sha_256 | scram_sha_512, and File is the path to a text file which contains two lines, first line for username and second line for password; defaults to undefined
  • ssl: whether to use SSL, defaults to false, more information can be found in brod documentation
-type connection() :: pid().
-type hostname() :: kpro:hostname().
-type portnum() :: kpro:portnum().
-type requests() :: kpro_sent_reqs:requests().
-type state() ::
          #state{client_id :: client_id(),
                 parent :: pid(),
                 config :: config(),
                 remote :: kpro:endpoint(),
                 sock :: gen_tcp:socket() | ssl:sslsocket(),
                 mod :: undefined | gen_tcp | ssl,
                 req_timeout :: undefined | timeout(),
                 api_vsns :: undefined | kpro:vsn_ranges(),
                 requests :: undefined | requests(),
                 backlog :: false | queue:queue()}.

Functions

-spec all_cfg_keys() -> [cfg_key()].
Return all config keys make client config management easy.
-spec debug(connection(), print | string() | none) -> ok.
Enable/disable debugging on the socket process. debug(Pid, pring) prints debug info on stdout debug(Pid, File) prints debug info into a File debug(Pid, none) stops debugging
Link to this function

format_status(Opt, Status)

View Source
-spec get_api_vsns(pid()) -> {ok, undefined | kpro:vsn_ranges()} | {error, any()}.
Link to this function

get_connect_timeout(Config)

View Source
-spec get_connect_timeout(config()) -> timeout().
-spec get_endpoint(pid()) -> {ok, kpro:endpoint()} | {error, any()}.
-spec get_tcp_sock(pid()) -> {ok, port()}.
Link to this function

init(Parent, Host, Port, Config)

View Source
-spec init(pid(), hostname(), portnum(), config()) -> no_return().
Link to this function

request_async(Pid, Request)

View Source
-spec request_async(connection(), kpro:req()) -> ok | {error, any()}.
Send a request. Caller should expect to receive a response having Rsp#kpro_rsp.ref the same as Request#kpro_req.ref unless Request#kpro_req.no_ack is set to 'true'
Link to this function

request_sync(Pid, Request, Timeout)

View Source
-spec request_sync(connection(), kpro:req(), timeout()) -> ok | {ok, kpro:rsp()} | {error, any()}.
Send a request and wait for response for at most Timeout milliseconds.
-spec send(connection(), kpro:req()) -> ok.
Same as @link request_async/2. Only that the message towards connection process is a cast (not a call), unless the request requires no ack from Kafka, in which case call is used to avoid message overflow. It always return 'ok'.
Link to this function

start(Host, Port, Config)

View Source
-spec start(hostname(), portnum(), config()) -> {ok, pid()} | {error, any()}.
Connect to the given endpoint. The started connection pid is linked to caller unless nolink := true is found in Config
-spec stop(connection()) -> ok | {error, any()}.
Stop socket process.
Link to this function

system_code_change(State, Module, Vsn, Extra)

View Source
Link to this function

system_continue(Parent, Debug, State)

View Source
Link to this function

system_terminate(Reason, Parent, Debug, Misc)

View Source
-spec system_terminate(any(), _, _, _) -> no_return().