View Source kpro_connection (kafka_protocol v4.1.10)
Summary
Types
Connection configuration.
Functions
Return all config keys make client config management easy.
Enable/disable debugging on the socket process. debug(Pid, pring) prints debug info on stdout debug(Pid, File) prints debug info into a File debug(Pid, none) stops debugging
Send a request. Caller should expect to receive a response having
Rsp#kpro_rsp.ref
the same as Request#kpro_req.ref
unless Request#kpro_req.no_ack
is set to 'true'Send a request and wait for response for at most Timeout milliseconds.
Same as @link request_async/2. Only that the message towards connection process is a cast (not a call), unless the request requires no ack from Kafka, in which case call is used to avoid message overflow. It always return 'ok'.
Connect to the given endpoint. The started connection pid is linked to caller unless
nolink := true
is found in Config
Stop socket process.
Types
-type cfg_key() ::
connect_timeout | client_id | extra_sock_opts | debug | nolink | query_api_versions |
request_timeout | sasl | ssl.
-type cfg_val() :: term().
-type client_id() :: kpro:client_id().
Connection configuration.
It is a tuple list or map with following keys (all of them are optional):connection_timeout
: timeout (in ms) for the initial connection, defaults to 5 secondsclient_id
: string representing the client in Kafka, defaults to "kpro-client"extra_sock_opts
: extra options passed down togen_tpc
, defaults to []debug
: debugging mode, defaults to falsenolink
: whether not to link thekpro_connection
process to the caller, defaults to falsequery_api_version
: whether to query Kafka for supported API versions at the beginning, so thatkpro
can use newer APIs; theApiVersionRequest
was introduced in Kafka 0.10, so set this to false when using an older version of Kafka; defaults to truerequest_timeout
: timeout (in ms) for the actual request, defaults to 4 minutessasl
: configuration of SASL authentication, can be either{Mechanism, Username, Password}
or{Mechanism, File}
orundefined
, whereMechanism
isplain | scram_sha_256 | scram_sha_512
, andFile
is the path to a text file which contains two lines, first line for username and second line for password; defaults toundefined
ssl
: whether to use SSL, defaults tofalse
, more information can be found in brod documentation
-type connection() :: pid().
-type hostname() :: kpro:hostname().
-type portnum() :: kpro:portnum().
-type requests() :: kpro_sent_reqs:requests().
-type state() :: #state{client_id :: client_id(), parent :: pid(), config :: config(), remote :: kpro:endpoint(), sock :: gen_tcp:socket() | ssl:sslsocket(), mod :: undefined | gen_tcp | ssl, req_timeout :: undefined | timeout(), api_vsns :: undefined | kpro:vsn_ranges(), requests :: undefined | requests(), backlog :: false | queue:queue()}.
Functions
-spec all_cfg_keys() -> [cfg_key()].
-spec debug(connection(), print | string() | none) -> ok.
-spec get_api_vsns(pid()) -> {ok, undefined | kpro:vsn_ranges()} | {error, any()}.
-spec get_endpoint(pid()) -> {ok, kpro:endpoint()} | {error, any()}.
-spec request_async(connection(), kpro:req()) -> ok | {error, any()}.
Rsp#kpro_rsp.ref
the same as Request#kpro_req.ref
unless Request#kpro_req.no_ack
is set to 'true'
-spec request_sync(connection(), kpro:req(), timeout()) -> ok | {ok, kpro:rsp()} | {error, any()}.
-spec send(connection(), kpro:req()) -> ok.
nolink := true
is found in Config
-spec stop(connection()) -> ok | {error, any()}.