View Source Basic Server

Mix.install([
  {:membrane_core, "~> 1.0.0"},
  {:membrane_rtsp, "~> 0.6.0"},
  {:membrane_rtp_plugin, "~> 0.24.0"},
  {:membrane_rtp_h264_plugin, "~> 0.19.0"},
  {:membrane_h264_plugin, "~> 0.9"},
  {:membrane_file_plugin, "~> 0.16.0"},
  {:membrane_realtimer_plugin, "~> 0.9.0"}
])

Introduction

In this guide, we'll create a simple RTSP server that streams an H264 file to clients via UDP and TCP

Before we start the server itself, we'll first create the Membrane pipeline that will serve the H264 file.

TCP/UDP Sink

To serve media data to the client over the same connection used for controlling the session, we need to create a TCP payloader sink to wrap the RTP packets as described in RFC 2326 Section 10.12

defmodule BasicServer.TCP.Sink do
  @moduledoc false

  use Membrane.Sink

  def_input_pad(:input, accepted_format: _any, availability: :on_request)

  def_options(socket: [spec: :inet.socket()])

  @impl true
  def handle_init(_ctx, opts) do
    {[], %{socket: opts.socket}}
  end

  @impl true
  def handle_buffer(Pad.ref(:input, channel), buffer, _ctx, state) do
    :gen_tcp.send(
      state.socket,
      <<0x24::8, channel::8, byte_size(buffer.payload)::16, buffer.payload::binary>>
    )

    {[], state}
  end
end

And we'll need another sink to send data using an open UDP socket

defmodule BasicServer.UDP.Sink do
  @moduledoc false

  use Membrane.Sink

  def_input_pad(:input, accepted_format: _any)

  def_options(
    socket: [spec: :inet.socket()],
    address: [spec: :inet.ipaddress()],
    port: [spec: :inet.port_number()]
  )

  @impl true
  def handle_init(_ctx, opts) do
    {[], %{socket: opts.socket, address: opts.address, port: opts.port}}
  end

  @impl true
  def handle_buffer(:input, buffer, _ctx, state) do
    :gen_udp.send(state.socket, state.address, state.port, buffer.payload)
    {[], state}
  end
end

Pipeline

defmodule BasicServer.Pipeline do
  use Membrane.Pipeline

  require Membrane.Logger

  def start(options) do
    Membrane.Pipeline.start(__MODULE__, options)
  end

  @impl true
  def handle_init(_ctx, options) do
    Membrane.Logger.info("""
    Start the pipeline with the following config
    #{inspect(options)}
    """)

    spec =
      child(:source, %Membrane.File.Source{location: options[:file_location]})
      |> child(:parser, %Membrane.H264.Parser{
        generate_best_effort_timestamps: %{framerate: {60, 1}},
        output_stream_structure: :annexb,
        output_alignment: :nalu
      })
      |> child(:realtimer, Membrane.Realtimer)
      |> via_in(Pad.ref(:input, options[:ssrc]),
        options: [payloader: Membrane.RTP.H264.Payloader]
      )
      |> child(:session_bin, Membrane.RTP.SessionBin)
      |> via_out(Pad.ref(:rtp_output, options[:ssrc]),
        options: [payload_type: options[:payload_type], clock_rate: options[:clock_rate]]
      )
      |> build_sink(options)

    {[spec: spec], %{}}
  end

  defp build_sink(link_builder, options) do
    if options[:transport] == :TCP do
      link_builder
      |> via_in(Pad.ref(:input, elem(options.channels, 0)))
      |> child(:sink, %BasicServer.TCP.Sink{socket: options[:tcp_socket]})
    else
      link_builder
      |> child(:sink, %BasicServer.UDP.Sink{
        socket: options[:rtp_socket],
        address: options[:address],
        port: elem(options[:client_port], 0)
      })
    end
  end
end

RTSP Request Handler

Now we need to implement the Membrane.RTSP.Server.Handler behaviour

defmodule BasicServer.RequestHandler do
  @moduledoc false

  @behaviour Membrane.RTSP.Server.Handler

  require Logger

  alias Membrane.RTSP.Response

  @url "rtsp://localhost:8554/mystream"
  @file_path "./file.h264"

  @impl true
  def handle_open_connection(conn) do
    {:ok, {address, port}} = :inet.peername(conn)
    Logger.info("New connection from #{:inet.ntoa(address)}:#{port}")

    %{
      control_path: @url <> "/video",
      config: %{payload_type: 96, clock_rate: 90_000},
      pipeline_pid: nil
    }
  end

  @impl true
  def handle_describe(req, state) do
    Logger.info("Handle DESCRIBE request")

    if req.path == @url do
      video_config = state[:config]

      sdp = ExSDP.new(session_name: "MySession")

      sdp =
        ExSDP.Media.new(:video, 0, "RTP/AVP", "#{video_config.payload_type}")
        |> ExSDP.add_attribute([
          {"control", @url <> "/video"},
          %ExSDP.Attribute.RTPMapping{
            clock_rate: video_config.clock_rate,
            payload_type: video_config.payload_type,
            encoding: :H264
          }
        ])
        |> then(&ExSDP.add_media(sdp, &1))

      Response.new(200)
      |> Response.with_header("Content-Type", "application/sdp")
      |> Response.with_body(to_string(sdp))
      |> then(&{&1, state})
    else
      {Response.new(400), state}
    end
  end

  @impl true
  def handle_setup(req, state) do
    Logger.info("Handle SETUP request: #{req.path}")

    if req.path == state[:control_path] do
      {Response.new(200), state}
    else
      {Response.new(400), state}
    end
  end

  @impl true
  def handle_play(configured_media_context, state) do
    Logger.info("""
    Handle PLAY request, starting the pipeline
    config: #{inspect(configured_media_context)}
    """)

    options =
      configured_media_context[state.control_path]
      |> Map.merge(state[:config])
      |> Map.put(:file_location, @file_path)

    {:ok, _sup_pid, pipeline_pid} = BasicServer.Pipeline.start(options)

    {Response.new(200), %{state | pipeline_pid: pipeline_pid}}
  end

  @impl true
  def handle_pause(state) do
    {Response.new(501), state}
  end

  @impl true
  def handle_teardown(state) do
    Logger.info("Handle TEARDOWN request, stopping the pipeline")
    Membrane.Pipeline.terminate(state.pipeline_pid)
    {Response.new(200), %{state | pipeline_pid: nil}}
  end

  @impl true
  def handle_closed_connection(state) do
    Logger.info("Connection closed")

    if pid = state.pipeline_pid do
      Membrane.Pipeline.terminate(pid)
    end

    :ok
  end
end

Now we can start the server

Membrane.RTSP.Server.start_link(
  handler: BasicServer.RequestHandler,
  port: 8554,
  udp_rtp_port: 5000,
  udp_rtcp_port: 5001
)

Testing the server

Now that the server is started, we can try it by using ffplay or vlc.

ffplay -i "rtsp://localhost:8554/mystream"

Or request the media via TCP

ffplay -rtsp_transport tcp -i "rtsp://localhost:8554/mystream"