-type count() :: non_neg_integer().


-type primitive() :: kpro:primitive().


-type primitive_type() :: kpro:primitive_type().


copy_bytes(Size, Bin)

-spec copy_bytes(-1 | count(), binary()) -> {binary(), binary()}.

Make a copy of the head instead of keeping referencing the original.

decode(_, Bin)

-spec decode(tagged_fields | kpro:primitive_type(), binary()) -> {primitive(), binary()}.

Decode primitives.


-spec decode_corr_id(binary()) -> {kpro:corr_id(), binary()}.

All kafka messages begin with a 32 bit correlation ID.

encode(C, I)

-spec encode(primitive_type(), kpro:primitive()) -> iodata().

Encode primitives.

find(Field, Struct)

Find struct filed value. Error exception {not_struct, TheInput} is raised when the input is not a kpro:struct(). Error exception {no_such_field, FieldName} is raised when the field is not found.

find(Field, Struct, Default)

Find struct field value, return Default if the field is not found.

get_prelude_schema(Tag, Vsn)

-spec get_prelude_schema(atom(), kpro:vsn()) -> kpro:struct_schema().

get_req_schema(Api, Vsn)

-spec get_req_schema(kpro:api(), kpro:vsn()) -> kpro:struct_schema().

get_rsp_schema(Api, Vsn)

-spec get_rsp_schema(kpro:api(), kpro:vsn()) -> kpro:struct_schema().

get_ts_type(_, A)

-spec get_ts_type(byte(), byte()) -> kpro:timestamp_type().

keyfind(FieldName, FieldValue, Structs)

-spec keyfind(kpro:field_name(), kpro:field_value(), [kpro:struct()]) -> false | kpro:struct().

Find in a list for a struct having a given field value. exception if the given field name is not found. return 'false' if no such struct exists.


-spec now_ts() -> kpro:msg_ts().


Same as ok_pipe/2 with infinity as default timeout.

ok_pipe(FunList, Timeout)

Function pipeline. The first function takes no args, all succeeding ones should be arity-0 or 1 functions. All functions should return ok | {ok, Result} | {error, Reason}. where Result is the input arg of the next function, or the result of pipeline if it's the last pipe node.

NOTE: If a function returns ok the next should be an arity-0 function. Any {error, Reason} return value would cause the pipeline to abort.

NOTE: The pipe functions are delegated to an agent process to evaluate, only exceptions and process links are propagated back to caller other side-effects like monitor references are not handled.

parse_endpoints(Protocol, Str)

-spec parse_endpoints(kpro:protocol() | undefined, string()) -> [kpro:endpoint()].

Parse comma separated endpoints in a string into a list of {Host::string(), Port::integer()} pairs. Endpoints may start with protocol prefix (non case sensitive): PLAINTEXT://, SSL://, SASL_PLAINTEXT:// or SASL_SSL://. The first arg is to filter desired endpoints from parse result.


-spec produce_api_vsn_to_magic_vsn(kpro:vsn()) -> kpro:magic().

send_and_recv(Kpro_req, Sock, Mod, ClientId, Timeout)

-spec send_and_recv(kpro:req(), port(), module(), kpro:client_id(), timeout()) -> kpro:struct().

Send request to active = false socket, and wait for response.

send_and_recv_raw(Req, Sock, Mod, Timeout)

-spec send_and_recv_raw(iodata(), port(), module(), timeout()) -> binary().

Send a raw packet to broker and wait for response raw packet.

update_map(Key, Fun, Init, Map)

Equivalent to maps:update_with/4 (since otp 19).

with_timeout(F0, Timeout)

delegate function evaluation to an agent process abort if it does not finish in time. exceptions and linked processes are caught in agent process and propagated to parent process