View Source Basic Server
Mix.install([
{:membrane_core, "~> 1.0.0"},
{:membrane_rtsp, "~> 0.6.0"},
{:membrane_rtp_plugin, "~> 0.24.0"},
{:membrane_rtp_h264_plugin, "~> 0.19.0"},
{:membrane_h264_plugin, "~> 0.9"},
{:membrane_file_plugin, "~> 0.16.0"},
{:membrane_realtimer_plugin, "~> 0.9.0"}
])
Introduction
In this guide, we'll create a simple RTSP server that streams an H264 file to clients via UDP and TCP
Before we start the server itself, we'll first create the Membrane pipeline that will serve the H264 file.
TCP/UDP Sink
To serve media data to the client over the same connection used for controlling the session, we need to create a TCP payloader sink to wrap the RTP packets as described in RFC 2326 Section 10.12
defmodule BasicServer.TCP.Sink do
@moduledoc false
use Membrane.Sink
def_input_pad(:input, accepted_format: _any, availability: :on_request)
def_options(socket: [spec: :inet.socket()])
@impl true
def handle_init(_ctx, opts) do
{[], %{socket: opts.socket}}
end
@impl true
def handle_buffer(Pad.ref(:input, channel), buffer, _ctx, state) do
:gen_tcp.send(
state.socket,
<<0x24::8, channel::8, byte_size(buffer.payload)::16, buffer.payload::binary>>
)
{[], state}
end
end
And we'll need another sink to send data using an open UDP socket
defmodule BasicServer.UDP.Sink do
@moduledoc false
use Membrane.Sink
def_input_pad(:input, accepted_format: _any)
def_options(
socket: [spec: :inet.socket()],
address: [spec: :inet.ipaddress()],
port: [spec: :inet.port_number()]
)
@impl true
def handle_init(_ctx, opts) do
{[], %{socket: opts.socket, address: opts.address, port: opts.port}}
end
@impl true
def handle_buffer(:input, buffer, _ctx, state) do
:gen_udp.send(state.socket, state.address, state.port, buffer.payload)
{[], state}
end
end
Pipeline
defmodule BasicServer.Pipeline do
use Membrane.Pipeline
require Membrane.Logger
def start(options) do
Membrane.Pipeline.start(__MODULE__, options)
end
@impl true
def handle_init(_ctx, options) do
Membrane.Logger.info("""
Start the pipeline with the following config
#{inspect(options)}
""")
spec =
child(:source, %Membrane.File.Source{location: options[:file_location]})
|> child(:parser, %Membrane.H264.Parser{
generate_best_effort_timestamps: %{framerate: {60, 1}},
output_stream_structure: :annexb,
output_alignment: :nalu
})
|> child(:realtimer, Membrane.Realtimer)
|> via_in(Pad.ref(:input, options[:ssrc]),
options: [payloader: Membrane.RTP.H264.Payloader]
)
|> child(:session_bin, Membrane.RTP.SessionBin)
|> via_out(Pad.ref(:rtp_output, options[:ssrc]),
options: [payload_type: options[:payload_type], clock_rate: options[:clock_rate]]
)
|> build_sink(options)
{[spec: spec], %{}}
end
defp build_sink(link_builder, options) do
if options[:transport] == :TCP do
link_builder
|> via_in(Pad.ref(:input, elem(options.channels, 0)))
|> child(:sink, %BasicServer.TCP.Sink{socket: options[:tcp_socket]})
else
link_builder
|> child(:sink, %BasicServer.UDP.Sink{
socket: options[:rtp_socket],
address: options[:address],
port: elem(options[:client_port], 0)
})
end
end
end
RTSP Request Handler
Now we need to implement the Membrane.RTSP.Server.Handler
behaviour
defmodule BasicServer.RequestHandler do
@moduledoc false
@behaviour Membrane.RTSP.Server.Handler
require Logger
alias Membrane.RTSP.Response
@url "rtsp://localhost:8554/mystream"
@file_path "./file.h264"
@impl true
def handle_open_connection(conn) do
{:ok, {address, port}} = :inet.peername(conn)
Logger.info("New connection from #{:inet.ntoa(address)}:#{port}")
%{
control_path: @url <> "/video",
config: %{payload_type: 96, clock_rate: 90_000},
pipeline_pid: nil
}
end
@impl true
def handle_describe(req, state) do
Logger.info("Handle DESCRIBE request")
if req.path == @url do
video_config = state[:config]
sdp = ExSDP.new(session_name: "MySession")
sdp =
ExSDP.Media.new(:video, 0, "RTP/AVP", "#{video_config.payload_type}")
|> ExSDP.add_attribute([
{"control", @url <> "/video"},
%ExSDP.Attribute.RTPMapping{
clock_rate: video_config.clock_rate,
payload_type: video_config.payload_type,
encoding: :H264
}
])
|> then(&ExSDP.add_media(sdp, &1))
Response.new(200)
|> Response.with_header("Content-Type", "application/sdp")
|> Response.with_body(to_string(sdp))
|> then(&{&1, state})
else
{Response.new(400), state}
end
end
@impl true
def handle_setup(req, state) do
Logger.info("Handle SETUP request: #{req.path}")
if req.path == state[:control_path] do
{Response.new(200), state}
else
{Response.new(400), state}
end
end
@impl true
def handle_play(configured_media_context, state) do
Logger.info("""
Handle PLAY request, starting the pipeline
config: #{inspect(configured_media_context)}
""")
options =
configured_media_context[state.control_path]
|> Map.merge(state[:config])
|> Map.put(:file_location, @file_path)
{:ok, _sup_pid, pipeline_pid} = BasicServer.Pipeline.start(options)
{Response.new(200), %{state | pipeline_pid: pipeline_pid}}
end
@impl true
def handle_pause(state) do
{Response.new(501), state}
end
@impl true
def handle_teardown(state) do
Logger.info("Handle TEARDOWN request, stopping the pipeline")
Membrane.Pipeline.terminate(state.pipeline_pid)
{Response.new(200), %{state | pipeline_pid: nil}}
end
@impl true
def handle_closed_connection(state) do
Logger.info("Connection closed")
if pid = state.pipeline_pid do
Membrane.Pipeline.terminate(pid)
end
:ok
end
end
Now we can start the server
Membrane.RTSP.Server.start_link(
handler: BasicServer.RequestHandler,
port: 8554,
udp_rtp_port: 5000,
udp_rtcp_port: 5001
)
Testing the server
Now that the server is started, we can try it by using ffplay
or vlc
.
ffplay -i "rtsp://localhost:8554/mystream"
Or request the media via TCP
ffplay -rtsp_transport tcp -i "rtsp://localhost:8554/mystream"