View Source Membrane.RTC.Engine (Membrane RTC Engine v0.16.0)
RTC Engine implementation.
RTC Engine is an abstraction layer responsible for linking together different types of Endpoints.
From the implementation point of view, RTC Engine is a Membrane.Pipeline
.
Messages
The RTC Engine works by sending messages which notify user logic about important events.
To receive RTC Engine messages you have to register your process so that RTC Engine will
know where to send them.
All messages RTC Engine can emit are described in Membrane.RTC.Engine.Message
docs.
Registering for messages
Registration can be done using register/2
e.g.
Engine.register(rtc_engine, self())
This will register your process to receive RTC Engine messages. You can register multiple processes to receive messages from an RTC Engine instance. In such a case each message will be sent to each registered process.
Endpoints
Endpoints are Membrane.Bin
s able to publish their own tracks and subscribe to tracks from other Endpoints.
One can think about Endpoint as an entity responsible for handling some specific task.
An Endpoint can be added and removed using add_endpoint/3
and remove_endpoint/2
respectively.
There are two types of Endpoints:
- Standalone Endpoints - they are in most cases spawned only once per RTC Engine instance and they are not associated with any client.
- Client Endpoints - they are associated with some client. Associating Endpoint with client will send some Media Events to the Endpoint's Client Library e.g. one which indicates which tracks belong to which client.
Currently RTC Engine ships with the implementation of three Endpoints:
Membrane.RTC.Engine.Endpoint.WebRTC
which is responsible for establishing a connection with some WebRTC client (mainly browser) and exchanging media with it. WebRTC Endpoint is a Client Endpoint.Membrane.RTC.Engine.Endpoint.HLS
which is responsible for receiving media tracks from all other Endpoints and saving them to files by creating HLS playlists. HLS Endpoint is a Standalone Endpoint.Membrane.RTC.Engine.Endpoint.RTSP
which is responsible for connecting to a remote RTSP stream source and sending the appropriate media track to other Endpoints. RTSP Endpoint is a Standalone Endpoint.
Each of these endpoints is available as a separate package. Refer to the appropriate package for usage examples:
User can also implement custom Endpoints, see Custom Endpoints guide.
Readiness state
Each endpoint is presumed to be initially inactive and has to declare itself ready to fully join the Engine.
Before it does, it:
- will not receive notifications about other endpoints and their metadata
- will not receive information about tracks
- will not be able to publish any tracks
- will not be able to update their metadata
When declaring itself as ready, the endpoint also has an opportunity to set their metadata.
To mark the endpoint as active, it has to send the ready_action_t/0
.
Example
@impl true
def handle_other({:media_event, %{type: "join", metadata: metadata}}, _context, state) do
{{:ok, notify: {:ready, metadata}}, state}
end
@impl true
def handle_other(:ready, _context, state) do
Membrane.Logger.debug("Succesfully activated the endpoint")
{:ok, state}
end
Summary
Types
Endpoint configuration options.
Membrane action that will cause RTC Engine to forward supplied message to the business logic.
RTC Engine configuration options.
Membrane action that will cause RTC Engine to publish some message to all other endpoints.
Types of messages that can be published to other Endpoints.
Type of messages that need to be handled by each endpoint.
A message that the Engine sends to the endpoint when it ackowledges its ready_action_t/0
Membrane action that will mark the endpoint as ready and set its metadata.
The Engine will respond with ready_ack_msg_t/0
to acknowledge your transition to ready state.
Subscription options.
Membrane action that will inform RTC Engine about track readiness.
Functions
Adds endpoint to the RTC Engine
Returns child specification for spawning under a supervisor
Returns list of the RTC Engine's endpoints.
Sends message to RTC Engine endpoint. If endpoint doesn't exist message is ignored
Registers process with pid who
for receiving messages from RTC Engine
Removes endpoint from the RTC Engine
Subscribes an endpoint for a track.
Changes pipeline's playback to :stopped
and terminates its process.
Unregisters process with pid who
from receiving messages from RTC Engine
Types
Endpoint configuration options.
id
- assigned endpoint id. If not provided it will be generated by RTC Engine.node
- node on which endpoint should be spawned. If not provided, current node is used.
@type forward_to_parent_action_t() :: {:notify_parent, {:forward_to_parent, message :: any()}}
Membrane action that will cause RTC Engine to forward supplied message to the business logic.
@type options_t() :: [ id: String.t(), trace_ctx: map(), telemetry_label: Membrane.TelemetryMetrics.label(), display_manager?: boolean(), toilet_capacity: pos_integer() | nil ]
RTC Engine configuration options.
id
is used by logger. If not provided it will be generated.trace_ctx
is used by OpenTelemetry. All traces from this engine will be attached to this context. Example function from which you can get Otel Context isget_current/0
fromOpenTelemetry.Ctx
.display_manager?
- set totrue
if you want to limit number of tracks sent fromMembrane.RTC.Engine.Endpoint.WebRTC
to a browser.toilet_capacity
- sets capacity of buffer between engine and endpoints. Use it when you expect bursts of data for your tracks. If not provided it will be set to 200.
@type publish_action_t() :: {:notify_parent, {:publish, publish_message_t()}}
Membrane action that will cause RTC Engine to publish some message to all other endpoints.
@type publish_message_t() :: {:new_tracks, [Membrane.RTC.Engine.Track.t()]} | {:removed_tracks, [Membrane.RTC.Engine.Track.t()]} | {:track_metadata_updated, metadata :: any()} | {:endpoint_metadata_updated, metadata :: any()} | {:tracks_priority, tracks :: list()} | Membrane.RTC.Engine.Notifications.TrackNotification.t()
Types of messages that can be published to other Endpoints.
@type published_message_t() :: {:new_tracks, [Membrane.RTC.Engine.Track.t()]} | {:removed_tracks, [Membrane.RTC.Engine.Track.t()]} | {:new_endpoint, Membrane.RTC.Engine.Endpoint.t()} | {:endpoint_removed, Membrane.RTC.Engine.Endpoint.id()} | {:track_metadata_updated, Membrane.RTC.Engine.Track.t()} | {:endpoint_metadata_updated, Membrane.RTC.Engine.Endpoint.t()} | {:tracks_priority, tracks :: list()} | ready_ack_msg_t() | Membrane.RTC.Engine.Notifications.TrackNotification.t()
Type of messages that need to be handled by each endpoint.
@type ready_ack_msg_t() :: {:ready, other_endpoints :: [Membrane.RTC.Engine.Endpoint.t()]}
A message that the Engine sends to the endpoint when it ackowledges its ready_action_t/0
@type ready_action_t() :: {:notify, :ready | {:ready, metadata :: any()}}
Membrane action that will mark the endpoint as ready and set its metadata.
The Engine will respond with ready_ack_msg_t/0
to acknowledge your transition to ready state.
This action can only be used once, any further calls by an endpoint will be ignored.
@type subscription_opts_t() :: Keyword.t()
Subscription options.
@type track_ready_action_t() :: {:notify_parent, {:track_ready, Membrane.RTC.Engine.Track.id(), Membrane.RTC.Engine.Track.variant(), Membrane.RTC.Engine.Track.encoding()}}
Membrane action that will inform RTC Engine about track readiness.
Functions
@spec add_endpoint( pid :: pid(), endpoint :: Membrane.ChildrenSpec.child_definition(), opts :: endpoint_options_t() ) :: :ok | :error
Adds endpoint to the RTC Engine
For more information refer to endpoint_options_t/0
.
Returns child specification for spawning under a supervisor
Returns list of the RTC Engine's endpoints.
@spec get_registry_name() :: atom()
Sends message to RTC Engine endpoint. If endpoint doesn't exist message is ignored
Registers process with pid who
for receiving messages from RTC Engine
Removes endpoint from the RTC Engine
@spec start(options :: options_t(), process_options :: GenServer.options()) :: GenServer.on_start()
@spec start_link(options :: options_t(), process_options :: GenServer.options()) :: GenServer.on_start()
@spec subscribe( rtc_engine :: pid(), endpoint_id :: String.t(), track_id :: Membrane.RTC.Engine.Track.id(), opts :: subscription_opts_t() ) :: :ok | {:error, :timeout | :invalid_track_id}
Subscribes an endpoint for a track.
The endpoint will be notified about track readiness in Membrane.Bin.handle_pad_added/3
callback.
endpoint_id
is the id of the endpoint, which wants to subscribe to the track.
Changes pipeline's playback to :stopped
and terminates its process.
Unregisters process with pid who
from receiving messages from RTC Engine