# Ordering Buffer

In this chapter we will deal with the next [element](../glossary/glossary.md#element) in our [pipeline](../glossary/glossary.md#pipeline) - the [Ordering Buffer](../glossary/glossary.md#jitter-buffer--ordering-buffer).
As stated in the [chapter about the system architecture](02_SystemArchitecture.md), this element is responsible for ordering the incoming [packets](../glossary/glossary.md#packet), based on their sequence id.
Because Ordering Buffer is a filtering element, we need to specify both the input and the output [pads](../glossary/glossary.md#pad):

**_`lib/elements/ordering_buffer.ex`_**

```elixir
defmodule Basic.Elements.OrderingBuffer do
  use Membrane.Filter
  alias Basic.Formats.Packet

  def_input_pad :input,
    flow_control: :manual,
    demand_unit: :buffers,
    accepted_format: %Packet{type: :custom_packets}

  def_output_pad :output,
    flow_control: :manual,
    accepted_format: %Packet{type: :custom_packets}
 ...
end
```

Note the format specification definition there - we expect `Basic.Formats.Packet` of type `:custom_packets` to be sent on the input pad, and the same type of packets to be sent through the output pad.
In the next step let's specify how we want the state of our element to look like:

**_`lib/elements/ordering_buffer.ex`_**

```elixir
defmodule Basic.Elements.OrderingBuffer do
 ...
  @impl true
  def handle_init(_context, _options) do
    {[],
     %{
       ordered_packets: [],
       last_sent_seq_id: 0
     }}
  end
 ...
end
```

If you don't remember what is the purpose of the Ordering Buffer, please refer to the [2nd chapter](02_SystemArchitecture.md).
We will need to hold a list of ordered packets, as well as a sequence id of the packet, which most recently was sent through the output pad (we need to know if there are some packets missing between the last sent packet and the first packet in our ordered list).

Handling demands is quite straightforward:

**_`lib/elements/ordering_buffer.ex`_**

```elixir
defmodule Basic.Elements.OrderingBuffer do
 ...
  @impl true
  def handle_demand(:output, size, _unit, _context, state) do
    {[demand: {:input, size}], state}
  end
 ...
end
```

We simply send the `:demand` on the `:input` pad once we receive a demand on the `:output` pad. One packet on input corresponds to one packet on output so for each 1 unit of demand we send 1 unit of demand to the `:input` pad.

Now we can go to the main part of the Ordering Buffer implementation - the `handle_buffer/4` callback.
The purpose of this callback is to process the incoming buffer. It gets called once a new buffer is available and waiting to be processed.

**_`lib/elements/ordering_buffer.ex`_**

```elixir
defmodule Basic.Elements.OrderingBuffer do
 ...
  @impl true
  def handle_buffer(:input, buffer, _context, state) do
    packet = unzip_packet(buffer.payload)
    ordered_packets = [packet | state.ordered_packets] |> Enum.sort()
    state = %{state | ordered_packets: ordered_packets}
    [{last_seq_id, _} | _] = ordered_packets
   ...
  end

  defp unzip_packet(packet) do
    regex = ~r/^\[seq\:(?<seq_id>\d+)\](?<data>.*)$/
    %{"data" => data, "seq_id" => seq_id} = Regex.named_captures(regex, packet)
    {String.to_integer(seq_id), %Membrane.Buffer{payload: data} }
  end
 ...
end
```

First, we are taking advantage of the [Regex module](https://hexdocs.pm/elixir/1.13/Regex.html) available in the Elixir.
With the `Regex.named_captures` we can access the values of the fields defined within the regex description.
Do you remember what our packet looks like?

```
[seq:7][frameid:2][timestamp:3]data
```

Above you can see an exemplary packet. We need to fetch the value of the sequence id (in our case it is equal to 7) and get the rest of the packet.
Therefore we have defined the regex description as:

```elixir
~r/^\[seq\:(?<seq_id>\d+)\](?<data>.*)$/
```

> ### TIP - How to read the regex?
>
> - `~r/.../` stands for the `sigil_r/1` [sigil](https://elixir-lang.org/getting-started/sigils.html)
> - `^` describes the beginning of the input
> - `\[` stands for the opening square bracket ('\[') at the beginning is required to escape the char since the plain '\[' has a special meaning in the regex syntax
> - `seq` is a sequence of 's', 'e', 'q' characters (we need to adjust our regex description to match the header of the packet)
> - `\:` stands for the ':' character (we also need to escape that character since it is meaningful in the regex's syntax)
> - `(?<seq_id>\d+)` allows us to define a named capture - later one, once we use the `Regex.named_captures/2`, we will retrieve the map with 'seq_id' key and the corresponding value equal to the string described by the `\d+` 'partial' regex (which means - one or more occurrences of a decimal). Generally speaking, a named capture can be specified with the following structure: `(?<key>regex)` where `regex` is a regex description.
> - `\]` is the escaped closing square bracket character
> - `(?<data>.*)` is a named capture description that allows us to get the value of a `.*` regex (any character no or any number of times) under a `data` key.
> - `$` stands for the end of the input

The result of `Regex.named_captures/2` applied to that regex description and the exemplary packet should be following:

```elixir
{"seq_id"=>7, "data"=>"[frameid:2][timestamp:3]data"}
```

Once we unzip the header of the packet in the `handle_buffer/4` callback, we can put the incoming packet in the `ordered_packets` list and sort that list. Due to the fact, that elements of this list are tuples, whose first element is a sequence id (a value that is unique), the list will be sorted based on the sequence id.
We also get the sequence id of the first element in the updated `ordered_packets` list.

Here comes the rest of the `handle_buffer/4` definition:

**_`lib/elements/ordering_buffer.ex`_**

```elixir
defmodule Basic.Elements.OrderingBuffer do
 ...
  def handle_buffer(:input, buffer, _context, state) do
 ...
    if state.last_sent_seq_id + 1 == last_seq_id do
      {ready_packets_sequence, ordered_packets_left} =
        get_ready_packets_sequence(ordered_packets, [])

      {last_sent_seq_id, _} = List.last(ready_packets_sequence)

      state = %{
        state
        | ordered_packets: ordered_packets_left,
          last_sent_seq_id: last_sent_seq_id
      }

      ready_buffers = Enum.map(ready_packets_sequence, &elem(&1, 1))

      {[buffer: {:output, ready_buffers}], state}
    else
      {[redemand: :output], state}
    end
  end
 ...
end
```

We need to distinguish between two situations: the currently processed packet can have a sequence id which is subsequent to the sequence id of the last sent packet or there might be some packets not yet delivered to us, with sequence ids in between the last sent sequence id and the sequence id of a currently processed packet. In the second case, we should store the packet and wait for the next packets to arrive. We will accomplish that using [`redemands` mechanism](../glossary/glossary.md#redemands), which will be explained in detail in the next chapter.
However, in the first situation, we need to get the ready packet's sequence - that means, a consistent batch of packets from the `:ordered_packets`. This can be done in the following way:

**_`lib/elements/ordering_buffer.ex`_**

```elixir
defmodule Basic.Elements.OrderingBuffer do
 ...
  defp get_ready_packets_sequence([first_packet | ordered_rest], []) do
    get_ready_packets_sequence(ordered_rest, [first_packet])
  end

  defp get_ready_packets_sequence(
         [next_seq = {next_id, _} | ordered_rest],
         [{last_id, _} | _] = ready_sequence
       )
       when next_id == last_id + 1 do
    get_ready_packets_sequence(ordered_rest, [next_seq | ready_sequence])
  end

  defp get_ready_packets_sequence(ordered_packets, ready_sequence) do
    {Enum.reverse(ready_sequence), ordered_packets}
  end
 ...
end
```

Note the order of the definitions, since we are taking advantage of the pattern matching mechanism!
The algorithm implemented in the snippet above is really simple - we are recursively taking the next packet out of the `:ordered_packets` buffer until it becomes empty or there is a missing packet (`next_id == last_id + 1`) between the last taken packet and the next packet in the buffer.
Once we have a consistent batch of packets, we can update the state (both the`:ordered_packets` and the `:last_sent_seq_id` need to be updated) and output the ready packets by defining the `:buffer` action.

Test the `OrderingBuffer`:

```console
mix test test/elements/ordering_buffer_test.exs
```

Now the `OrderingBuffer` element is ready. Before we implement the next element let us introduce you to the concept of redemands.
