misha_cafex v0.0.6 Cafex.Protocol.Codec behaviour
Kafka protocol request encoder and server response decoder implementation specification.
Summary
Types
The modules which implement the Decoder interface
The decode function in each decoder will return there own response struct
Functions
Decode kafka array
Decode message
Decode MessageSet
Encode kafka array
Encode assignment
Encode bytes
Encode group protocol metadata
Encode single kafka message
Encode MessageSet
Encode string
Parse assignment
Parse group protocol metadata
Callbacks
Returen the api_key of a request
Return the api_version the request will use
Decode the response message in the Kafka server response
Encode the request
Return whether the api request has a response
Types
decoder ::
Cafex.Protocol.Metadata |
Cafex.Protocol.Produce |
Cafex.Protocol.Fetch |
Cafex.Protocol.Offset |
Cafex.Protocol.GroupCoordinator |
Cafex.Protocol.OffsetCommit |
Cafex.Protocol.OffsetFetch |
Cafex.Protocol.JoinGroup |
Cafex.Protocol.SyncGroup |
Cafex.Protocol.Heartbeat |
Cafex.Protocol.LeaveGroup |
Cafex.Protocol.ListGroups |
Cafex.Protocol.DescribeGroups
The modules which implement the Decoder interface
See Cafex.Protocol
response ::
Cafex.Protocol.Metadata.Response.t |
Cafex.Protocol.Produce.Response.t |
Cafex.Protocol.Fetch.Response.t |
Cafex.Protocol.Offset.Response.t |
Cafex.Protocol.GroupCoordinator.Response.t |
Cafex.Protocol.OffsetCommit.Response.t |
Cafex.Protocol.OffsetFetch.Response.t |
Cafex.Protocol.JoinGroup.Response.t |
Cafex.Protocol.SyncGroup.Response.t |
Cafex.Protocol.Heartbeat.Response.t |
Cafex.Protocol.LeaveGroup.Response.t |
Cafex.Protocol.ListGroups.Response.t |
Cafex.Protocol.DescribeGroups.Response.t
The decode function in each decoder will return there own response struct
See Cafex.Protocol
Functions
Decode kafka array
Examples
iex> decode_array(<<0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2>>, fn <<x :: 32, rest :: binary>> -> {x, rest} end)
{[1, 2], <<>>}
Decode message
Examples
iex> decode_message(<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121>>)
{%Cafex.Protocol.Message{value: "hey", magic_byte: 0}, <<>>}
iex> decode_message(<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121>>)
{%Cafex.Protocol.Message{value: "hey", key: nil, magic_byte: 0}, <<>>}
iex> decode_message(<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 156, 151, 255, 143, 0, 0, 0, 0, 0, 3, 107, 101, 121, 0, 0, 0, 3, 104, 101, 121>>)
{%Cafex.Protocol.Message{value: "hey", key: "key", magic_byte: 0}, <<>>}
Specs
decode_message_set(binary) :: [Cafex.Protocol.Message.t]
Decode MessageSet
Encode kafka array
Examples
iex> encode_array([], nil)
<<0, 0, 0, 0>>
iex> encode_array([1, 2, 3], fn x -> <<x :: 32-signed>> end)
[<<0, 0, 0, 3>>, [<<0, 0, 0, 1>>, <<0, 0, 0, 2>>, <<0, 0, 0, 3>>]]
Encode assignment
Examples
iex> encode_assignment({0, [{"topic", [0, 1, 2]}], ""})
<<0, 0, 0, 33, 0, 0, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0>>
iex> encode_assignment({0, [{"topic", [3, 4]}], nil})
<<0, 0, 0, 29, 0, 0, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 255, 255, 255, 255>>
Specs
encode_bytes(nil | binary) :: binary
Encode bytes
Examples
iex> encode_bytes(nil)
<<255, 255, 255, 255>>
iex> encode_bytes("")
<<0, 0, 0, 0>>
iex> encode_bytes("hey")
<<0, 0, 0, 3, 104, 101, 121>>
Encode group protocol metadata
Examples
iex> encode_group_protocol_metadata({0, ["topic_name"], nil})
<<0, 0, 0, 22, 0, 0, 0, 0, 0, 1, 0, 10, 116, 111, 112, 105, 99, 95, 110, 97, 109, 101, 255, 255, 255, 255>>
iex> encode_group_protocol_metadata({0, ["topic_name"], ""})
<<0, 0, 0, 22, 0, 0, 0, 0, 0, 1, 0, 10, 116, 111, 112, 105, 99, 95, 110, 97, 109, 101, 0, 0, 0, 0>>
Specs
encode_message(Cafex.Protocol.Message.t) :: binary
Encode single kafka message
Examples
iex> encode_message(%Cafex.Protocol.Message{value: "hey", timestamp: nil})
<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 254, 46, 107, 157, 0, 0, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121>>
iex> encode_message(%Cafex.Protocol.Message{value: "hey", timestamp: 1, magic_byte: 1, timestamp_type: :create_time})
<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 25, 180, 33, 39, 101, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 255, 255, 255, 255, 0, 0, 0, 3, 104, 101, 121>>
iex> encode_message(%Cafex.Protocol.Message{value: "hey", key: ""})
<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 106, 86, 37, 142, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 104, 101, 121>>
iex> encode_message(%Cafex.Protocol.Message{value: "hey", key: "key"})
<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 156, 151, 255, 143, 0, 0, 0, 0, 0, 3, 107, 101, 121, 0, 0, 0, 3, 104, 101, 121>>
iex> encode_message(%Cafex.Protocol.Message{value: "hey", key: "key", magic_byte: 1, timestamp_type: :create_time})
<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 82, 200, 27, 221, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 107, 101, 121, 0, 0, 0, 3, 104, 101, 121>>
Specs
encode_message_set([Cafex.Protocol.Message.t]) :: binary
Encode MessageSet
Specs
encode_string(nil | binary) :: binary
Encode string
Examples
iex> encode_string(nil)
<<255, 255>>
iex> encode_string("")
<<0, 0>>
iex> encode_string("hey")
<<0, 3, 104, 101, 121>>
Parse assignment
Examples
iex> parse_assignment(<<0, 0, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0>>)
{0, [{"topic", [0, 1, 2]}], ""}
iex> parse_assignment(<<0, 0, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 255, 255, 255, 255>>)
{0, [{"topic", [3, 4]}], nil}
Parse group protocol metadata
Examples
iex> parse_group_protocol_metadata(<<0, 0, 0, 0, 0, 1, 0, 10, 116, 111, 112, 105, 99, 95, 110, 97, 109, 101, 255, 255, 255, 255>>)
{0, ["topic_name"], nil}
iex> parse_group_protocol_metadata(<<0, 0, 0, 0, 0, 1, 0, 10, 116, 111, 112, 105, 99, 95, 110, 97, 109, 101, 0, 0, 0, 0>>)
{0, ["topic_name"], ""}
Callbacks
Specs
api_version(request) :: Cafex.Protocol.api_version
Return the api_version the request will use.
Specs
decode(binary) :: response
Decode the response message in the Kafka server response
Specs
has_response?(request) :: boolean
Return whether the api request has a response.
All request expecte server reply except the produce request with a 0 required_acks.