View Source kpro_lib (kafka_protocol v4.2.1)

Summary

Functions

Make a copy of the head instead of keeping referencing the original.

Decode primitives.

All kafka messages begin with a 32 bit correlation ID.

Encode primitives.

Find struct filed value. Error exception {not_struct, TheInput} is raised when the input is not a kpro:struct(). Error exception {no_such_field, FieldName} is raised when the field is not found.

Find struct field value, return Default if the field is not found.

Find in a list for a struct having a given field value. exception if the given field name is not found. return 'false' if no such struct exists.

Same as ok_pipe/2 with infinity as default timeout.

Function pipeline. The first function takes no args, all succeeding ones should be arity-0 or 1 functions. All functions should return ok | {ok, Result} | {error, Reason}. where Result is the input arg of the next function, or the result of pipeline if it's the last pipe node.

Parse comma separated endpoints in a string into a list of {Host::string(), Port::integer()} pairs. Endpoints may start with protocol prefix (non case sensitive): PLAINTEXT://, SSL://, SASL_PLAINTEXT:// or SASL_SSL://. The first arg is to filter desired endpoints from parse result.

Send request to active = false socket, and wait for response.

Send a raw packet to broker and wait for response raw packet.

Equivalent to maps:update_with/4 (since otp 19).

delegate function evaluation to an agent process abort if it does not finish in time. exceptions and linked processes are caught in agent process and propagated to parent process

Types

count/0

-type count() :: non_neg_integer().

primitive/0

-type primitive() :: kpro:primitive().

primitive_type/0

-type primitive_type() :: kpro:primitive_type().

Functions

copy_bytes(Size, Bin)

-spec copy_bytes(-1 | count(), binary()) -> {binary(), binary()}.

Make a copy of the head instead of keeping referencing the original.

decode(_, Bin)

-spec decode(tagged_fields | kpro:primitive_type(), binary()) -> {primitive(), binary()}.

Decode primitives.

decode_corr_id(_)

-spec decode_corr_id(binary()) -> {kpro:corr_id(), binary()}.

All kafka messages begin with a 32 bit correlation ID.

encode(C, I)

-spec encode(primitive_type(), kpro:primitive()) -> iodata().

Encode primitives.

find(Field, Struct)

Find struct filed value. Error exception {not_struct, TheInput} is raised when the input is not a kpro:struct(). Error exception {no_such_field, FieldName} is raised when the field is not found.

find(Field, Struct, Default)

Find struct field value, return Default if the field is not found.

get_prelude_schema(Tag, Vsn)

-spec get_prelude_schema(atom(), kpro:vsn()) -> kpro:struct_schema().

get_req_schema(Api, Vsn)

-spec get_req_schema(kpro:api(), kpro:vsn()) -> kpro:struct_schema().

get_rsp_schema(Api, Vsn)

-spec get_rsp_schema(kpro:api(), kpro:vsn()) -> kpro:struct_schema().

get_ts_type(_, A)

-spec get_ts_type(byte(), byte()) -> kpro:timestamp_type().

keyfind(FieldName, FieldValue, Structs)

-spec keyfind(kpro:field_name(), kpro:field_value(), [kpro:struct()]) -> false | kpro:struct().

Find in a list for a struct having a given field value. exception if the given field name is not found. return 'false' if no such struct exists.

now_ts()

-spec now_ts() -> kpro:msg_ts().

ok_pipe(FunList)

Same as ok_pipe/2 with infinity as default timeout.

ok_pipe(FunList, Timeout)

Function pipeline. The first function takes no args, all succeeding ones should be arity-0 or 1 functions. All functions should return ok | {ok, Result} | {error, Reason}. where Result is the input arg of the next function, or the result of pipeline if it's the last pipe node.

NOTE: If a function returns ok the next should be an arity-0 function. Any {error, Reason} return value would cause the pipeline to abort.

NOTE: The pipe functions are delegated to an agent process to evaluate, only exceptions and process links are propagated back to caller other side-effects like monitor references are not handled.

parse_endpoints(Protocol, Str)

-spec parse_endpoints(kpro:protocol() | undefined, string()) -> [kpro:endpoint()].

Parse comma separated endpoints in a string into a list of {Host::string(), Port::integer()} pairs. Endpoints may start with protocol prefix (non case sensitive): PLAINTEXT://, SSL://, SASL_PLAINTEXT:// or SASL_SSL://. The first arg is to filter desired endpoints from parse result.

produce_api_vsn_to_magic_vsn(V)

-spec produce_api_vsn_to_magic_vsn(kpro:vsn()) -> kpro:magic().

send_and_recv(Kpro_req, Sock, Mod, ClientId, Timeout)

-spec send_and_recv(kpro:req(), port(), module(), kpro:client_id(), timeout()) -> kpro:struct().

Send request to active = false socket, and wait for response.

send_and_recv_raw(Req, Sock, Mod, Timeout)

-spec send_and_recv_raw(iodata(), port(), module(), timeout()) -> binary().

Send a raw packet to broker and wait for response raw packet.

update_map(Key, Fun, Init, Map)

Equivalent to maps:update_with/4 (since otp 19).

with_timeout(F0, Timeout)

delegate function evaluation to an agent process abort if it does not finish in time. exceptions and linked processes are caught in agent process and propagated to parent process