View Source Membrane.RTC.Engine (Membrane RTC Engine v0.23.0)
RTC Engine implementation.
RTC Engine is an abstraction layer responsible for linking together different types of Endpoints.
From the implementation point of view, RTC Engine is a Membrane.Pipeline
.
Messages
The RTC Engine works by sending messages which notify user logic about important events.
To receive RTC Engine messages you have to register your process so that RTC Engine will
know where to send them.
All messages RTC Engine can emit are described in Membrane.RTC.Engine.Message
docs.
Registering for messages
Registration can be done using register/2
e.g.
Engine.register(rtc_engine, self())
This will register your process to receive RTC Engine messages. You can register multiple processes to receive messages from an RTC Engine instance. In such a case each message will be sent to each registered process.
Endpoints
Endpoints are Membrane.Bin
s able to publish their own tracks and subscribe to tracks from other Endpoints.
One can think about Endpoint as an entity responsible for handling some specific task.
An Endpoint can be added and removed using add_endpoint/3
and remove_endpoint/2
respectively.
There are two types of Endpoints:
- Standalone Endpoints - they are in most cases spawned only once per RTC Engine instance and they are not associated with any client.
- Client Endpoints - they are associated with some client. Associating Endpoint with client will send some Media Events to the Endpoint's Client Library e.g. one which indicates which tracks belong to which client.
Currently RTC Engine ships with the implementation of the following Endpoints:
Membrane.RTC.Engine.Endpoint.WebRTC
which is responsible for establishing a connection with some WebRTC client (mainly browser) and exchanging media with it. WebRTC Endpoint is a Client Endpoint.Membrane.RTC.Engine.Endpoint.HLS
which is responsible for receiving media tracks from all other Endpoints and saving them to files by creating HLS playlists. HLS Endpoint is a Standalone Endpoint.Membrane.RTC.Engine.Endpoint.RTSP
which is responsible for connecting to a remote RTSP stream source and sending the appropriate media track to other Endpoints. RTSP Endpoint is a Standalone Endpoint.Membrane.RTC.Engine.Endpoint.File
which is responsible for reading track from a file, payloading it into RTP, and sending it to other Endpoints. File Endpoint is a Standalone Endpoint.Membrane.RTC.Engine.Endpoint.SIP
which is responsible for establishing a connection with some SIP device (e.g. phone) and exchanging media with it. SIP Endpoint is a Standalone Endpoint.Membrane.RTC.Engine.Endpoint.Recording
which is responsible for saving incoming tracks to pointed storages. Recording Endpoint is a Standalone Endpoint.
Each of these endpoints is available as a separate package. Refer to the appropriate package for usage examples:
membrane_rtc_engine_webrtc
membrane_rtc_engine_hls
membrane_rtc_engine_rtsp
membrane_rtc_engine_file
membrane_rtc_engine_sip
membrane_rtc_engine_recording
User can also implement custom Endpoints, see Custom Endpoints guide.
Readiness state
Each endpoint is presumed to be initially inactive and has to declare itself ready to fully join the Engine.
Before it does, it:
- will not receive notifications about other endpoints and their metadata
- will not receive information about tracks
- will not be able to publish any tracks
- will not be able to update their metadata
When declaring itself as ready, the endpoint also has an opportunity to set their metadata.
To mark the endpoint as active, it has to send the ready_action_t/0
.
Example
@impl true
def handle_other({:media_event, %{type: "join", metadata: metadata}}, _context, state) do
{{:ok, notify: {:ready, metadata}}, state}
end
@impl true
def handle_other(:ready, _context, state) do
Membrane.Logger.debug("Succesfully activated the endpoint")
{:ok, state}
end
Summary
Types
Membrane action that will inform RTC Engine about disabling track variant by the client.
Membrane action that will inform RTC Engine about enabling track variant by the client.
Endpoint configuration options.
Membrane action that informs engine that endpoint finished processing and should be removed.
Membrane action that will cause RTC Engine to forward supplied message to the business logic.
RTC Engine configuration options.
Membrane action that will cause RTC Engine to publish some message to all other endpoints.
Types of messages that can be published to other Endpoints.
Type of messages that need to be handled by each endpoint.
A message that the Engine sends to the endpoint when it ackowledges its ready_action_t/0
Membrane action that will mark the endpoint as ready and set its metadata.
The Engine will respond with ready_ack_msg_t/0
to acknowledge your transition to ready state.
Subscription options.
Membrane action that will inform RTC Engine about track readiness.
Functions
Adds endpoint to the RTC Engine
Returns child specification for spawning under a supervisor
Returns list of the RTC Engine's endpoints.
Returns number of forwarded tracks in RTC Engine.
Returns list of the RTC Engine's tracks.
Sends message to RTC Engine endpoint. If endpoint doesn't exist message is ignored
Registers process with pid who
for receiving messages from RTC Engine
Removes endpoint from the RTC Engine
Subscribes an endpoint for a track.
Subscribes an endpoint for a track asynchronously.
Terminates the engine.
Unregisters process with pid who
from receiving messages from RTC Engine
Types
@type disable_track_variant_action_t() :: {:notify_parent, {:disable_track_variant, Membrane.RTC.Engine.Track.id(), Membrane.RTC.Engine.Track.variant()}}
Membrane action that will inform RTC Engine about disabling track variant by the client.
@type enable_track_variant_action_t() :: {:notify_parent, {:enable_track_variant, Membrane.RTC.Engine.Track.id(), Membrane.RTC.Engine.Track.variant()}}
Membrane action that will inform RTC Engine about enabling track variant by the client.
Endpoint configuration options.
id
- assigned endpoint id. If not provided it will be generated by RTC Engine.node
- node on which endpoint should be spawned. If not provided, current node is used.
@type finished_action_t() :: {:notify_parent, :finished}
Membrane action that informs engine that endpoint finished processing and should be removed.
@type forward_to_parent_action_t() :: {:notify_parent, {:forward_to_parent, message :: any()}}
Membrane action that will cause RTC Engine to forward supplied message to the business logic.
@type options_t() :: [ id: String.t(), trace_ctx: map(), display_manager?: boolean(), toilet_capacity: pos_integer() | nil ]
RTC Engine configuration options.
id
is used by logger. If not provided it will be generated.display_manager?
- set totrue
if you want to limit number of tracks sent fromMembrane.RTC.Engine.Endpoint.WebRTC
to a browser.toilet_capacity
- sets capacity of buffer between engine and endpoints. Use it when you expect bursts of data for your tracks. If not provided it will be set to 200.
@type publish_action_t() :: {:notify_parent, {:publish, publish_message_t()}}
Membrane action that will cause RTC Engine to publish some message to all other endpoints.
@type publish_message_t() :: {:new_tracks, [Membrane.RTC.Engine.Track.t()]} | {:removed_tracks, [Membrane.RTC.Engine.Track.t()]} | {:track_metadata_updated, metadata :: any()} | {:enable_track_variant, Membrane.RTC.Engine.Track.id(), Membrane.RTC.Engine.Track.variant()} | {:disable_track_variant, Membrane.RTC.Engine.Track.id(), Membrane.RTC.Engine.Track.variant()} | {:endpoint_metadata_updated, metadata :: any()} | {:tracks_priority, tracks :: list()} | Membrane.RTC.Engine.Notifications.TrackNotification.t()
Types of messages that can be published to other Endpoints.
@type published_message_t() :: {:new_tracks, [Membrane.RTC.Engine.Track.t()]} | {:removed_tracks, [Membrane.RTC.Engine.Track.t()]} | {:new_endpoint, Membrane.RTC.Engine.Endpoint.t()} | {:endpoint_removed, Membrane.RTC.Engine.Endpoint.id()} | {:track_metadata_updated, Membrane.RTC.Engine.Track.t()} | {:track_variant_enabled, Membrane.RTC.Engine.Track.t(), Membrane.RTC.Engine.Track.variant()} | {:track_variant_disabled, Membrane.RTC.Engine.Track.t(), Membrane.RTC.Engine.Track.variant()} | {:endpoint_metadata_updated, Membrane.RTC.Engine.Endpoint.t()} | {:tracks_priority, tracks :: list()} | ready_ack_msg_t() | Membrane.RTC.Engine.Notifications.TrackNotification.t()
Type of messages that need to be handled by each endpoint.
@type ready_ack_msg_t() :: {:ready, other_endpoints :: [Membrane.RTC.Engine.Endpoint.t()]}
A message that the Engine sends to the endpoint when it ackowledges its ready_action_t/0
@type ready_action_t() :: {:notify_parent, :ready | {:ready, metadata :: any()}}
Membrane action that will mark the endpoint as ready and set its metadata.
The Engine will respond with ready_ack_msg_t/0
to acknowledge your transition to ready state.
This action can only be used once, any further calls by an endpoint will be ignored.
@type subscription_opts_t() :: Keyword.t()
Subscription options.
@type track_ready_action_t() :: {:notify_parent, {:track_ready, Membrane.RTC.Engine.Track.id(), Membrane.RTC.Engine.Track.variant(), Membrane.RTC.Engine.Track.encoding()}}
Membrane action that will inform RTC Engine about track readiness.
Functions
@spec add_endpoint( pid :: pid(), endpoint :: Membrane.ChildrenSpec.child_definition(), opts :: endpoint_options_t() ) :: :ok
Adds endpoint to the RTC Engine
For more information refer to endpoint_options_t/0
.
Returns child specification for spawning under a supervisor
Returns list of the RTC Engine's endpoints.
Returns number of forwarded tracks in RTC Engine.
It is number of active and pending subscriptions.
@spec get_registry_name() :: atom()
@spec get_tracks(rtc_engine :: pid()) :: [Membrane.RTC.Engine.Track.t()]
Returns list of the RTC Engine's tracks.
Sends message to RTC Engine endpoint. If endpoint doesn't exist message is ignored
Registers process with pid who
for receiving messages from RTC Engine
Removes endpoint from the RTC Engine
@spec start(options :: options_t(), process_options :: GenServer.options()) :: GenServer.on_start()
@spec start_link(options :: options_t(), process_options :: GenServer.options()) :: GenServer.on_start()
@spec subscribe( rtc_engine :: pid(), endpoint_id :: String.t(), track_id :: Membrane.RTC.Engine.Track.id(), opts :: subscription_opts_t() ) :: :ok | :ignored
Subscribes an endpoint for a track.
The endpoint will be notified about track readiness in Membrane.Bin.handle_pad_added/3
callback.
endpoint_id
is the id of the endpoint, which wants to subscribe to the track. Possible return values are:
:ok
- when endpoint subscribed on track successfully:ignored
- when subscribing was impossible because the state of the engine changed e.g: the track was already removed, or subscribing endpoint was removed
@spec subscribe_async( rtc_engine :: pid(), endpoint_id :: String.t(), track_id :: Membrane.RTC.Engine.Track.id(), opts :: subscription_opts_t() ) :: reference()
Subscribes an endpoint for a track asynchronously.
Sends a message to the calling process in the form of {:subscribe_result, subscribe_ref, result}
after the subscription is completed, where the subscribe_ref
is the reference returned
upon subscription and result
is the same as returned by subscribe/3
.
@spec terminate(pid(), timeout: timeout(), force?: boolean(), asynchronous?: boolean() ) :: :ok | {:ok, pid()} | {:error, :timeout}
Terminates the engine.
Accepts three options:
asynchronous?
- if set totrue
, pipline termination won't be blocking and will be executed in the process, which pid is returned as function result. If set tofalse
, engine termination will be blocking and will be executed in the process that called this function. Defaults tofalse
.timeout
- tells how much time (ms) to wait for engine to get gracefully terminated. Defaults to 5000.force?
- if set totrue
and engine is still alive aftertimeout
, engine will be killed usingProcess.exit/2
with reason:kill
, and function will return{:error, :timeout}
. If set tofalse
and engine is still alive aftertimeout
, function will raise an error. Defaults tofalse
.
Returns:
{:ok, pid}
- if optionasynchronous?: true
was passed.:ok
- if engine was gracefully terminated withintimeout
.{:error, :timeout}
- if engine was killed after atimeout
.
Unregisters process with pid who
from receiving messages from RTC Engine