-type cfg_key() ::
          connect_timeout | client_id | extra_sock_opts | debug | nolink | query_api_versions |
          request_timeout | sasl | ssl.


-type cfg_val() :: term().


-type client_id() :: kpro:client_id().


-type config() :: [{cfg_key(), cfg_val()}] | #{cfg_key() => cfg_val()}.

Connection configuration.

It is a tuple list or map with following keys (all of them are optional):

  • connection_timeout: timeout (in ms) for the initial connection, defaults to 5 seconds
  • client_id: string representing the client in Kafka, defaults to "kpro-client"
  • extra_sock_opts: extra options passed down to gen_tpc, defaults to []
  • debug: debugging mode, defaults to false
  • nolink: whether not to link the kpro_connection process to the caller, defaults to false
  • query_api_version: whether to query Kafka for supported API versions at the beginning, so that kpro can use newer APIs; the ApiVersionRequest was introduced in Kafka 0.10, so set this to false when using an older version of Kafka; defaults to true
  • request_timeout: timeout (in ms) for the actual request, defaults to 4 minutes
  • sasl: configuration of SASL authentication, can be either {Mechanism, Username, Password} or {Mechanism, File} or undefined, where Mechanism is plain | scram_sha_256 | scram_sha_512, and File is the path to a text file which contains two lines, first line for username and second line for password; defaults to undefined
  • ssl: whether to use SSL, defaults to false, more information can be found in brod documentation


-type connection() :: pid().


-type hostname() :: kpro:hostname().


-type portnum() :: kpro:portnum().


-type requests() :: kpro_sent_reqs:requests().


-type state() ::
          #state{client_id :: client_id(),
                 parent :: pid(),
                 config :: config(),
                 remote :: kpro:endpoint(),
                 sock :: gen_tcp:socket() | ssl:sslsocket(),
                 mod :: undefined | gen_tcp | ssl,
                 req_timeout :: undefined | timeout(),
                 api_vsns :: undefined | kpro:vsn_ranges(),
                 requests :: undefined | requests(),
                 backlog :: false | queue:queue()}.



-spec all_cfg_keys() -> [cfg_key()].

Return all config keys make client config management easy.

debug(Pid, File)

-spec debug(connection(), print | string() | none) -> ok.

Enable/disable debugging on the socket process. debug(Pid, pring) prints debug info on stdout debug(Pid, File) prints debug info into a File debug(Pid, none) stops debugging

format_status(Opt, Status)


-spec get_api_vsns(pid()) -> {ok, undefined | kpro:vsn_ranges()} | {error, any()}.


-spec get_connect_timeout(config()) -> timeout().


-spec get_endpoint(pid()) -> {ok, kpro:endpoint()} | {error, any()}.

init(Parent, Host, Port, Config)

-spec init(pid(), hostname(), portnum(), config()) -> no_return().

loop(State, Debug)

request_async(Pid, Request)

-spec request_async(connection(), kpro:req()) -> ok | {error, any()}.

Send a request. Caller should expect to receive a response having Rsp#kpro_rsp.ref the same as Request#kpro_req.ref unless Request#kpro_req.no_ack is set to 'true'

request_sync(Pid, Request, Timeout)

-spec request_sync(connection(), kpro:req(), timeout()) -> ok | {ok, kpro:rsp()} | {error, any()}.

Send a request and wait for response for at most Timeout milliseconds.

send(Pid, Kpro_req)

-spec send(connection(), kpro:req()) -> ok.

Same as @link request_async/2. Only that the message towards connection process is a cast (not a call), unless the request requires no ack from Kafka, in which case call is used to avoid message overflow. It always return 'ok'.

start(Host, Port, Config)

-spec start(hostname(), portnum(), config()) -> {ok, pid()} | {error, any()}.

Connect to the given endpoint. The started connection pid is linked to caller unless nolink := true is found in Config


-spec stop(connection()) -> ok | {error, any()}.

Stop socket process.

system_code_change(State, Module, Vsn, Extra)

system_continue(Parent, Debug, State)

system_terminate(Reason, Parent, Debug, Misc)

-spec system_terminate(any(), _, _, _) -> no_return().