Module kpro_lib

Data Types

count()

count() = non_neg_integer()

primitive()

primitive() = kpro:primitive()

primitive_type()

primitive_type() = kpro:primitive_type()

Function Index

copy_bytes/2Make a copy of the head instead of keeping referencing the original.
decode/2Decode primitives.
decode_corr_id/1All kafka messages begin with a 32 bit correlation ID.
encode/2Encode primitives.
find/2Find struct filed value.
find/3Find struct field value, return Default if the field is not found.
get_prelude_schema/2
get_req_schema/2
get_rsp_schema/2
get_ts_type/2
keyfind/3Find in a list for a struct having a given field value.
now_ts/0
ok_pipe/1Same as ok_pipe/2 with infinity as default timeout.
ok_pipe/2Function pipeline.
parse_endpoints/2Parse comma separated endpoints in a string into a list of {Host::string(), Port::integer()} pairs.
produce_api_vsn_to_magic_vsn/1
send_and_recv/5Send request to active = false socket, and wait for response.
send_and_recv_raw/4Send a raw packet to broker and wait for response raw packet.
update_map/4Equivalent to maps:update_with/4 (since otp 19).
with_timeout/2delegate function evaluation to an agent process abort if it does not finish in time.

Function Details

copy_bytes/2

copy_bytes(Size::-1 | count(), Bin::binary()) -> {binary(), binary()}

Make a copy of the head instead of keeping referencing the original.

decode/2

decode(X1::tagged_fields | kpro:primitive_type(), Bin::binary()) -> {primitive(), binary()}

Decode primitives.

decode_corr_id/1

decode_corr_id(X1::binary()) -> {kpro:corr_id(), binary()}

All kafka messages begin with a 32 bit correlation ID.

encode/2

encode(C::primitive_type(), I::kpro:primitive()) -> iodata()

Encode primitives.

find/2

find(Field::kpro:field_name(), Struct::kpro:struct()) -> kpro:field_value()

Find struct filed value. Error exception {not_struct, TheInput} is raised when the input is not a kpro:struct(). Error exception {no_such_field, FieldName} is raised when the field is not found.

find/3

find(Field::kpro:field_name(), Struct::kpro:struct(), Default::kpro:field_value()) -> kpro:field_value()

Find struct field value, return Default if the field is not found.

get_prelude_schema/2

get_prelude_schema(Tag::atom(), Vsn::kpro:vsn()) -> kpro:struct_schema()

get_req_schema/2

get_req_schema(Api::kpro:api(), Vsn::kpro:vsn()) -> kpro:struct_schema()

get_rsp_schema/2

get_rsp_schema(Api::kpro:api(), Vsn::kpro:vsn()) -> kpro:struct_schema()

get_ts_type/2

get_ts_type(X1::byte(), A::byte()) -> kpro:timestamp_type()

keyfind/3

keyfind(FieldName::kpro:field_name(), FieldValue::kpro:field_value(), Structs::[kpro:struct()]) -> false | kpro:struct()

Find in a list for a struct having a given field value. exception if the given field name is not found. return 'false' if no such struct exists.

now_ts/0

now_ts() -> kpro:msg_ts()

ok_pipe/1

ok_pipe(FunList) -> any()

Same as ok_pipe/2 with infinity as default timeout.

ok_pipe/2

ok_pipe(FunList, Timeout) -> any()

Function pipeline. The first function takes no args, all succeeding ones shoud be arity-0 or 1 functions. All functions should retrun ok | {ok, Result} | {error, Reason}. where Result is the input arg of the next function, or the result of pipeline if it's the last pipe node.

NOTE: If a funcition returns ok the next should be an arity-0 function. Any {error, Reason} return value would cause the pipeline to abort.

NOTE: The pipe funcions are delegated to an agent process to evaluate, only exceptions and process links are propagated back to caller other side-effects like monitor references are not handled.

parse_endpoints/2

parse_endpoints(Protocol::kpro:protocol() | undefined, Str::string()) -> [kpro:endpoint()]

Parse comma separated endpoints in a string into a list of {Host::string(), Port::integer()} pairs. Endpoints may start with protocol prefix (non case sensitive): PLAINTEXT://, SSL://, SASL_PLAINTEXT:// or SASL_SSL://. The first arg is to filter desired endpoints from parse result.

produce_api_vsn_to_magic_vsn/1

produce_api_vsn_to_magic_vsn(V::kpro:vsn()) -> kpro:magic()

send_and_recv/5

send_and_recv(Kpro_req::kpro:req(), Sock::port(), Mod::module(), ClientId::kpro:client_id(), Timeout::timeout()) -> kpro:struct()

Send request to active = false socket, and wait for response.

send_and_recv_raw/4

send_and_recv_raw(Req::iodata(), Sock::port(), Mod::module(), Timeout::timeout()) -> binary()

Send a raw packet to broker and wait for response raw packet.

update_map/4

update_map(Key, Fun, Init, Map) -> any()

Equivalent to maps:update_with/4 (since otp 19).

with_timeout/2

with_timeout(F0, Timeout) -> any()

delegate function evaluation to an agent process abort if it does not finish in time. exceptions and linked processes are caught in agent process and propagated to parent process


Generated by EDoc