View Source Custom endpoint
This section outlines how to implement a custom RTC Engine endpoint that saves tracks published by the WebRTC endpoint.
Before you continue
It is recommended to read Track Lifecycle section before diving deeper in this guide. You will also find there some information about publishing a track.
HLS endpoint
Another endpoint that saves WebRTC tracks and is shipped with the engine is the HLS endpoint.
General rules
Each RTC Engine endpoint has to:
- implement
Membrane.Bin
behavior - have at least one Membrane element that will be responsible for producing or consuming data as Membrane bins are used only for logical grouping - they don't process data
- publish or/and subscribe to tracks
Implementing Membrane.Bin
To implement Membrane.Bin
behavior we have to:
use Membrane.Bin
- specify some configuration options (if needed)
- specify input and/or output pads
- implement some callbacks.
Pads definition depends on what our bin is intended to do. For example, if our endpoint does not publish any tracks but only subscribes for tracks from other endpoints it can specify only input pads. Pads should be in the following form
def_input_pad :input,
demand_unit: :buffers,
stream_format: Membrane.RTP,
availability: :on_request
def_output_pad :output,
demand_unit: :buffers,
stream_format: Membrane.RTP,
availability: :on_request
When it comes to endpoint options we will have:
rtc_engine
- pid of RTC engine. It will be used for subscribing for a trackoutput_dir_path
- output directory path i.e. where our tracks should be saved.
Putting it all together we have
defmodule RecordingEndpoint do
@moduledoc """
Endpoint for saving tracks to a disk.
Each track will be saved in a separate file.
"""
use Membrane.Bin
alias Membrane.RTC.Engine
def_options rtc_engine: [
spec: pid(),
description: "Pid of parent Engine"
],
output_dir_path: [
spec: Path.t(),
description: "Path to directory, where tracks will be saved."
]
def_input_pad :input,
demand_unit: :buffers,
accepted_format: _any,
availability: :on_request
@impl true
def handle_init(_ctx, opts) do
state = %{
output_dir_path: opts.output_dir_path,
rtc_engine: opts.rtc_engine,
}
{[], state}
end
end
Consuming data
Because Membrane.Bin
s are not capable of sending or consuming any data on their own
(they can be more thought of as logical containers) we have to add at least one
casual Membrane element that will do this for us.
In our case, we will need three elements:
Membrane.RTC.Engine.Endpoint.WebRTC.TrackReceiver
- responsible for requesting the highest possible track variant and switching to other variants when the currently used one becomes inactiveMembrane.Stream.Serializer
- used for serializing incoming Membrane streamMembrane.File.Sink
- used for saving serialized data into a file
For each incoming track we will create separate serializer and sink.
Subscribing to a track
Whenever some endpoint publishes its tracks, all other endpoints receive a message
in form of {:new_tracks, tracks}
where tracks
is a list of Membrane.RTC.Engine.Track.t/0
.
To subscribe to any of the published tracks, an endpoint has to call Membrane.RTC.Engine.subscribe/4
.
After subscribing to a track, the endpoint will be notified about its readiness in
Membrane.Bin.handle_pad_added/3
callback. An example implementation of handle_pad_added
callback can look like this
@impl true
def handle_pad_added(Pad.ref(:input, track_id) = pad, _ctx, state) do
spec = [
bin_input(pad)
|> child({:track_receiver, track_id}, %TrackReceiver{
track: Map.fetch!(state.tracks, track_id),
initial_target_variant: :high
})
|> child({:serializer, track_id}, Membrane.Stream.Serializer)
|> child({:sink, track_id}, %Membrane.File.Sink{
location: Path.join(state.output_dir_path, track_id)
})
]
{[spec: spec], state}
end
We simply link our input pad on which we will receive media data to the track receiver, serializer and then to the sink.
The endpoint will be also notified when some tracks it subscribed to are removed with
{:removed_tracks, tracks}
message.
Deeper dive into TrackReciver
After linking the input pad for a given track, our endpoint will start receiving events. The very first event is always
TrackVariantResumed
. It informs that a given track variant is available and can be requested with theRequestTrackVariant
event. The engine ensures that each endpoint always receives a track variant starting from a keyframe, so before receiving the first media packets our endpoint will receive theTrackVariantSwitched
event and finally media packets will start flowing.When some track variant is paused, the endpoint will receive the
TrackVariantPaused
event.The whole process is shown in the following diagram
Engine ---- TrackVariantResumed (:medium) ---> Endpoint Engine <--- RequestTrackVariant (:medium) ---- Endpoint Engine ---- TrackVariantResumed (:low) ---> Endpoint Engine ---- TrackVariantSwitched ---> Endpoint Engine ---- media ---> Endpoint Engine ---- TrackVariantPaused (:medium) ---> Endpoint Engine <--- RequestTrackVariant (:low) ---- Endpoint
Because this logic must be duplicated in each endpoint we encapsulated it into
TrackReceiver
that can easily be plugged in before the actual Membrane element.
Putting it all together
defmodule RecordingEndpoint do
@moduledoc """
Endpoint for saving tracks to a disk.
Each track will be saved in a separate file.
"""
use Membrane.Bin
require Membrane.Logger
alias Membrane.RTC.Engine
alias Membrane.RTC.Engine.Endpoint.WebRTC.TrackReceiver
def_options rtc_engine: [
spec: pid(),
description: "Pid of parent Engine"
],
output_dir_path: [
spec: Path.t(),
description: "Path to directory, where tracks will be saved."
]
def_input_pad :input,
demand_unit: :buffers,
accepted_format: _any,
availability: :on_request
@impl true
def handle_init(_ctx, opts) do
state = %{
output_dir_path: opts.output_dir_path,
rtc_engine: opts.rtc_engine,
tracks: %{}
}
{[], state}
end
@impl true
def handle_pad_added(Pad.ref(:input, track_id) = pad, _ctx, state) do
spec = [
bin_input(pad)
|> child({:track_receiver, track_id}, %TrackReceiver{
track: Map.fetch!(state.tracks, track_id),
initial_target_variant: :high
})
|> child({:serializer, track_id}, Membrane.Stream.Serializer)
|> child({:sink, track_id}, %Membrane.File.Sink{
location: Path.join(state.output_dir_path, track_id)
})
]
{[spec: spec], state}
end
@impl true
def handle_pad_removed(Pad.ref(:input, track_id), _ctx, state) do
children = [
{:track_receiver, track_id},
{:serializer, track_id},
{:sink, track_id}
]
{[remove_children: children], state}
end
@impl true
def handle_parent_notification({:new_tracks, tracks}, ctx, state) do
{:endpoint, endpoint_id} = ctx.name
Enum.reduce_while(tracks, {[], state}, fn track, {[], state} ->
case Engine.subscribe(state.rtc_engine, endpoint_id, track.id) do
:ok ->
{:cont, {[], update_in(state, [:tracks], &Map.put(&1, track.id, track))}}
:ignored ->
{:cont, {[], state}}
{:error, reason} ->
raise "Couldn't subscribe to the track: #{inspect(track.id)}. Reason: #{inspect(reason)}"
end
end)
end
@impl true
def handle_parent_notification({:remove_tracks, _tracks}, _ctx, state) do
{[], state}
end
@impl true
def handle_parent_notification(_msg, _ctx, state) do
# ignore all other notifications
{[], state}
end
end