# RTMP Receiver

```elixir
File.cd(__DIR__)
Logger.configure(level: :error)

Mix.install([
  {:membrane_core, "~> 1.0"},
  {:membrane_realtimer_plugin, "~> 0.9.0"},
  {:membrane_rtmp_plugin, "~> 0.23.3"},
  {:membrane_kino_plugin, github: "membraneframework-labs/membrane_kino_plugin", tag: "v0.3.2"}
])
```

## Description

Defines a server that receives a media stream from the RTMP source and plays it directly in the notebook.

## Pipeline definition

Here's the definition of the pipeline:

1. The RTMP source provides video and audio data.
2. The data is then parsed into suitable H264 and AAC formats.
3. Finally, the media is pushed to Kino.Player.

```elixir
defmodule RTMP.Receiver.Pipeline do
  use Membrane.Pipeline

  @impl true
  def handle_init(_ctx, socket: socket, kino: kino) do
    source =
      child(:source, %Membrane.RTMP.SourceBin{
        socket: socket
      })

    playing_audio =
      get_child(:source)
      |> via_out(:audio)
      |> child(:audio_parser, %Membrane.AAC.Parser{
        out_encapsulation: :ADTS
      })
      |> via_in(:audio)
      |> get_child(:player)

    playing_video =
      get_child(:source)
      |> via_out(:video)
      |> child(:video_parser, %Membrane.H264.Parser{
        generate_best_effort_timestamps: %{framerate: {25, 1}},
        output_stream_structure: :annexb
      })
      |> via_in(:video)
      |> get_child(:player)

    player = child(:player, %Membrane.Kino.Player.Sink{kino: kino})

    spec = [source, playing_audio, playing_video, player]
    {[spec: spec], %{}}
  end

  # Once the source initializes, we grant it the control over the tcp socket
  @impl true
  def handle_child_notification(
        {:socket_control_needed, _socket, _source} = notification,
        :source,
        _ctx,
        state
      ) do
    send(self(), notification)

    {[], state}
  end

  def handle_child_notification(_notification, _child, _ctx, state) do
    {[], state}
  end

  @impl true
  def handle_info({:socket_control_needed, socket, source} = notification, _ctx, state) do
    case Membrane.RTMP.SourceBin.pass_control(socket, source) do
      :ok ->
        :ok

      {:error, :not_owner} ->
        Process.send_after(self(), notification, 200)
    end

    {[], state}
  end

  # The rest of the module is used for self-termination of the pipeline after processing finishes
  @impl true
  def handle_element_end_of_stream(:sink, _pad, _ctx, state) do
    Membrane.Pipeline.terminate(self())
    {[], state}
  end

  @impl true
  def handle_element_end_of_stream(_child, _pad, _ctx, state) do
    {[], state}
  end
end

:ok
```

## Server

Receiving an RTMP stream requires creating a TCP server. After the connection is established, a pipeline is created using the TCP socket.

```elixir
defmodule RTMP.Receiver do
  @server_ip {127, 0, 0, 1}

  def run(port: port, kino: kino) do
    parent = self()

    server_options = %Membrane.RTMP.Source.TcpServer{
      port: port,
      listen_options: [
        :binary,
        packet: :raw,
        active: false,
        ip: @server_ip
      ],
      socket_handler: fn socket ->
        # On new connection a pipeline is started
        {:ok, _supervisor, pipeline} =
          Membrane.Pipeline.start(RTMP.Receiver.Pipeline, socket: socket, kino: kino)

        send(parent, {:pipeline_spawned, pipeline})
        {:ok, pipeline}
      end
    }

    {:ok, pipeline} = start_server(server_options)

    await_termination(pipeline)
  end

  defp start_server(server_options) do
    {:ok, _server_pid} = Membrane.RTMP.Source.TcpServer.start_link(server_options)

    receive do
      {:pipeline_spawned, pipeline} ->
        {:ok, pipeline}
    end
  end

  defp await_termination(pipeline) do
    monitor_ref = Process.monitor(pipeline)

    receive do
      {:DOWN, ^monitor_ref, :process, _pid, _reason} ->
        :ok
    end
  end
end

:ok
```

```elixir
port = 1942

kino = Membrane.Kino.Player.new(video: true, audio: true)
Kino.render(kino)
RTMP.Receiver.run(port: port, kino: kino)
```
