misha_cafex v0.0.5 Cafex.Protocol.Codec behaviour

Kafka protocol request encoder and server response decoder implementation specification.

Summary

Types

The modules which implement the Decoder interface

The decode function in each decoder will return there own response struct

Callbacks

Returen the api_key of a request

Return the api_version the request will use

Decode the response message in the Kafka server response

Encode the request

Return whether the api request has a response

Types

decoder ::
  Cafex.Protocol.Metadata |
  Cafex.Protocol.Produce |
  Cafex.Protocol.Fetch |
  Cafex.Protocol.Offset |
  Cafex.Protocol.GroupCoordinator |
  Cafex.Protocol.OffsetCommit |
  Cafex.Protocol.OffsetFetch |
  Cafex.Protocol.JoinGroup |
  Cafex.Protocol.SyncGroup |
  Cafex.Protocol.Heartbeat |
  Cafex.Protocol.LeaveGroup |
  Cafex.Protocol.ListGroups |
  Cafex.Protocol.DescribeGroups

The modules which implement the Decoder interface

See Cafex.Protocol

Functions

decode_array(arg, item_decoder)

Decode kafka array

Examples

iex> decode_array(<<0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2>>, fn <<x :: 32, rest :: binary>> -> {x, rest} end)
{[1, 2], <<>>}
decode_bytes(arg)
decode_error(error_code)
decode_message(rest)

Decode message

Examples

iex> decode_message(<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121>>)
{%Cafex.Protocol.Message{value: "hey", magic_byte: 0}, <<>>}

iex> decode_message(<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121>>)
{%Cafex.Protocol.Message{value: "hey", key: nil, magic_byte: 0}, <<>>}

iex> decode_message(<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 156, 151, 255, 143, 0, 0, 0, 0, 0, 3, 107, 101, 121, 0, 0, 0, 3, 104, 101, 121>>)
{%Cafex.Protocol.Message{value: "hey", key: "key", magic_byte: 0}, <<>>}
decode_message_set(data)

Specs

decode_message_set(binary) :: [Cafex.Protocol.Message.t]

Decode MessageSet

decode_response(decoder, arg)
encode_array(array, item_encoder)

Encode kafka array

Examples

iex> encode_array([], nil)
<<0, 0, 0, 0>>

iex> encode_array([1, 2, 3], fn x -> <<x :: 32-signed>> end)
[<<0, 0, 0, 3>>, [<<0, 0, 0, 1>>, <<0, 0, 0, 2>>, <<0, 0, 0, 3>>]]
encode_assignment(arg)

Encode assignment

Examples

iex> encode_assignment({0, [{"topic", [0, 1, 2]}], ""})
<<0, 0, 0, 33, 0, 0, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0>>

iex> encode_assignment({0, [{"topic", [3, 4]}], nil})
<<0, 0, 0, 29, 0, 0, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 255, 255, 255, 255>>
encode_bytes(data)

Specs

encode_bytes(nil | binary) :: binary

Encode bytes

Examples

iex> encode_bytes(nil)
<<255, 255, 255, 255>>

iex> encode_bytes("")
<<0, 0, 0, 0>>

iex> encode_bytes("hey")
<<0, 0, 0, 3, 104, 101, 121>>
encode_group_protocol_metadata(arg)

Encode group protocol metadata

Examples

iex> encode_group_protocol_metadata({0, ["topic_name"], nil})
<<0, 0, 0, 22, 0, 0, 0, 0, 0, 1, 0, 10, 116, 111, 112, 105, 99, 95, 110, 97, 109, 101, 255, 255, 255, 255>>

iex> encode_group_protocol_metadata({0, ["topic_name"], ""})
<<0, 0, 0, 22, 0, 0, 0, 0, 0, 1, 0, 10, 116, 111, 112, 105, 99, 95, 110, 97, 109, 101, 0, 0, 0, 0>>
encode_message(msg)

Specs

encode_message(Cafex.Protocol.Message.t) :: binary

Encode single kafka message

Examples

iex> encode_message(%Cafex.Protocol.Message{value: "hey", timestamp: nil})
<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121>>

iex> encode_message(%Cafex.Protocol.Message{value: "hey", timestamp: 1, magic_byte: 1, timestamp_type: :create_time})
<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 25, 180, 33, 39, 101, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121>>

iex> encode_message(%Cafex.Protocol.Message{value: "hey", key: ""})
<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 106, 86, 37, 142, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 104, 101, 121>>

iex> encode_message(%Cafex.Protocol.Message{value: "hey", key: "key"})
<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 156, 151, 255, 143, 0, 0, 0, 0, 0, 3, 107, 101, 121, 0, 0, 0, 3, 104, 101, 121>>

iex> encode_message(%Cafex.Protocol.Message{value: "hey", key: "key", magic_byte: 1, timestamp_type: :create_time})
<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 82, 200, 27, 221, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 107, 101, 121, 0, 0, 0, 3, 104, 101, 121>>
encode_message_set(messages)

Specs

encode_message_set([Cafex.Protocol.Message.t]) :: binary

Encode MessageSet

encode_request(client_id, correlation_id, request)
encode_string(data)

Specs

encode_string(nil | binary) :: binary

Encode string

Examples

iex> encode_string(nil)
<<255, 255>>

iex> encode_string("")
<<0, 0>>

iex> encode_string("hey")
<<0, 3, 104, 101, 121>>
parse_assignment(arg)

Parse assignment

Examples

iex> parse_assignment(<<0, 0, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0>>)
{0, [{"topic", [0, 1, 2]}], ""}

iex> parse_assignment(<<0, 0, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 255, 255, 255, 255>>)
{0, [{"topic", [3, 4]}], nil}
parse_group_protocol_metadata(arg)

Parse group protocol metadata

Examples

iex> parse_group_protocol_metadata(<<0, 0, 0, 0, 0, 1, 0, 10, 116, 111, 112, 105, 99, 95, 110, 97, 109, 101, 255, 255, 255, 255>>)
{0, ["topic_name"], nil}

iex> parse_group_protocol_metadata(<<0, 0, 0, 0, 0, 1, 0, 10, 116, 111, 112, 105, 99, 95, 110, 97, 109, 101, 0, 0, 0, 0>>)
{0, ["topic_name"], ""}

Callbacks

api_key(request)

Specs

Returen the api_key of a request.

api_version(request)

Specs

Return the api_version the request will use.

decode(binary)

Specs

decode(binary) :: response

Decode the response message in the Kafka server response

encode(request)

Specs

encode(request) :: binary

Encode the request

has_response?(request)

Specs

has_response?(request) :: boolean

Return whether the api request has a response.

All request expecte server reply except the produce request with a 0 required_acks.