View Source kpro_lib (kafka_protocol v4.1.6)

Summary

Functions

Make a copy of the head instead of keeping referencing the original.
Decode primitives.
All kafka messages begin with a 32 bit correlation ID.
Encode primitives.
Find struct filed value. Error exception {not_struct, TheInput} is raised when the input is not a kpro:struct(). Error exception {no_such_field, FieldName} is raised when the field is not found.
Find struct field value, return Default if the field is not found.
Find in a list for a struct having a given field value. exception if the given field name is not found. return 'false' if no such struct exists.
Same as ok_pipe/2 with infinity as default timeout.

Function pipeline. The first function takes no args, all succeeding ones should be arity-0 or 1 functions. All functions should return ok | {ok, Result} | {error, Reason}. where Result is the input arg of the next function, or the result of pipeline if it's the last pipe node.

Parse comma separated endpoints in a string into a list of {Host::string(), Port::integer()} pairs. Endpoints may start with protocol prefix (non case sensitive): PLAINTEXT://, SSL://, SASL_PLAINTEXT:// or SASL_SSL://. The first arg is to filter desired endpoints from parse result.
Send request to active = false socket, and wait for response.
Send a raw packet to broker and wait for response raw packet.
Equivalent to maps:update_with/4 (since otp 19).
delegate function evaluation to an agent process abort if it does not finish in time. exceptions and linked processes are caught in agent process and propagated to parent process

Types

-type count() :: non_neg_integer().
-type primitive() :: kpro:primitive().
-type primitive_type() :: kpro:primitive_type().

Functions

-spec copy_bytes(-1 | count(), binary()) -> {binary(), binary()}.
Make a copy of the head instead of keeping referencing the original.
-spec decode(tagged_fields | kpro:primitive_type(), binary()) -> {primitive(), binary()}.
Decode primitives.
-spec decode_corr_id(binary()) -> {kpro:corr_id(), binary()}.
All kafka messages begin with a 32 bit correlation ID.
-spec encode(primitive_type(), kpro:primitive()) -> iodata().
Encode primitives.
Find struct filed value. Error exception {not_struct, TheInput} is raised when the input is not a kpro:struct(). Error exception {no_such_field, FieldName} is raised when the field is not found.
Link to this function

find(Field, Struct, Default)

View Source
Find struct field value, return Default if the field is not found.
Link to this function

get_prelude_schema(Tag, Vsn)

View Source
-spec get_prelude_schema(atom(), kpro:vsn()) -> kpro:struct_schema().
Link to this function

get_req_schema(Api, Vsn)

View Source
-spec get_req_schema(kpro:api(), kpro:vsn()) -> kpro:struct_schema().
Link to this function

get_rsp_schema(Api, Vsn)

View Source
-spec get_rsp_schema(kpro:api(), kpro:vsn()) -> kpro:struct_schema().
-spec get_ts_type(byte(), byte()) -> kpro:timestamp_type().
Link to this function

keyfind(FieldName, FieldValue, Structs)

View Source
-spec keyfind(kpro:field_name(), kpro:field_value(), [kpro:struct()]) -> false | kpro:struct().
Find in a list for a struct having a given field value. exception if the given field name is not found. return 'false' if no such struct exists.
-spec now_ts() -> kpro:msg_ts().
Same as ok_pipe/2 with infinity as default timeout.
Link to this function

ok_pipe(FunList, Timeout)

View Source

Function pipeline. The first function takes no args, all succeeding ones should be arity-0 or 1 functions. All functions should return ok | {ok, Result} | {error, Reason}. where Result is the input arg of the next function, or the result of pipeline if it's the last pipe node.

NOTE: If a function returns ok the next should be an arity-0 function. Any {error, Reason} return value would cause the pipeline to abort.

NOTE: The pipe functions are delegated to an agent process to evaluate, only exceptions and process links are propagated back to caller other side-effects like monitor references are not handled.
Link to this function

parse_endpoints(Protocol, Str)

View Source
-spec parse_endpoints(kpro:protocol() | undefined, string()) -> [kpro:endpoint()].
Parse comma separated endpoints in a string into a list of {Host::string(), Port::integer()} pairs. Endpoints may start with protocol prefix (non case sensitive): PLAINTEXT://, SSL://, SASL_PLAINTEXT:// or SASL_SSL://. The first arg is to filter desired endpoints from parse result.
Link to this function

produce_api_vsn_to_magic_vsn(V)

View Source
-spec produce_api_vsn_to_magic_vsn(kpro:vsn()) -> kpro:magic().
Link to this function

send_and_recv(Kpro_req, Sock, Mod, ClientId, Timeout)

View Source
-spec send_and_recv(kpro:req(), port(), module(), kpro:client_id(), timeout()) -> kpro:struct().
Send request to active = false socket, and wait for response.
Link to this function

send_and_recv_raw(Req, Sock, Mod, Timeout)

View Source
-spec send_and_recv_raw(iodata(), port(), module(), timeout()) -> binary().
Send a raw packet to broker and wait for response raw packet.
Link to this function

update_map(Key, Fun, Init, Map)

View Source
Equivalent to maps:update_with/4 (since otp 19).
Link to this function

with_timeout(F0, Timeout)

View Source
delegate function evaluation to an agent process abort if it does not finish in time. exceptions and linked processes are caught in agent process and propagated to parent process