View Source Membrane.RTC.Engine (Membrane RTC Engine v0.11.0)

RTC Engine implementation.

RTC Engine is an abstraction layer responsible for linking together different types of Endpoints. From the implementation point of view, RTC Engine is a Membrane.Pipeline.

messages

Messages

The RTC Engine works by sending messages which notify user logic about important events. To receive RTC Engine messages you have to register your process so that RTC Engine will know where to send them. All messages RTC Engine can emit are described in Membrane.RTC.Engine.Message docs.

registering-for-messages

Registering for messages

Registration can be done using register/2 e.g.

Engine.register(rtc_engine, self())

This will register your process to receive RTC Engine messages. You can register multiple processes to receive messages from an RTC Engine instance. In such a case each message will be sent to each registered process.

peers

Peers

deprecation-notice

Deprecation notice

Peers are deprecated as of version 0.10.0 and will be removed in the future

While Peers are still present in the RTC Engine, it's not recommended to use this feature in new applications.

Existing applications should take steps to move away from using built-in concept of peers.

Each peer represents some user that can possess some metadata. In RTC Engine, each peer is represented by an endpoint and from the purposes of the Engine, it is an enpdoint that we happen to store additional information about, metadata in particular. The peer doesn't exist without their endpoint.

adding-a-peer

Adding a peer

The only way to add a peer to the RTC Engine is to assign a peer_id to the endpoint representing it. This is done when adding an endpoint to the Engine by passing a peer_id option.

Example

:ok = Engine.add_endpoint(webrtc_endpoint, peer_id: "Peer1")

Each peer then needs to declare itself as ready before being fully connected to RTC Engine.

readiness-state

Readiness state

Each peer endpoint is presumed to be initially inactive and has to declare itself ready to fully join the Engine.

Before it does, it:

  • will not receive notifications about other peers and their metadata
  • will not receive information about tracks
  • will not be able to publish any tracks
  • will not be able to update their metadata

When declaring itself as ready, the peer also has an opportunity to set their metadata. To mark the peer as active, their endpoint has to send the ready_action_t/0.

Example

@impl true
def handle_other({:media_event, %{type: "join", metadata: metadata}}, _context, state) do
  {{:ok, notify: {:ready, metadata}}, state}
end

@impl true
def handle_other(:ready, _context, state) do
  Membrane.Logger.debug("Succesfully activated the peer")
  {:ok, state}
end

endpoints

Endpoints

Endpoints are Membrane.Bins able to publish their own tracks and subscribe to tracks from other Endpoints. One can think about Endpoint as an entity responsible for handling some specific task. An Endpoint can be added and removed using add_endpoint/3 and remove_endpoint/2 respectively.

There are two types of Endpoints:

  • Standalone Endpoints - they are in most cases spawned only once per RTC Engine instance and they are not associated with any peer.
  • Peer Endpoints - they are associated with some peer. Associating Endpoint with Peer will cause RTC Engine to send some Media Events to the Enpoint's Client Library e.g. one which indicates which tracks belong to which peer.

Currently RTC Engine ships with the implementation of two Endpoints:

  • Membrane.RTC.Engine.Endpoint.WebRTC which is responsible for establishing a connection with some WebRTC peer (mainly browser) and exchanging media with it. WebRTC Endpoint is a Peer Endpoint.
  • Membrane.RTC.Engine.Endpoint.HLS which is responsible for receiving media tracks from all other Endpoints and saving them to files by creating HLS playlists. HLS Endpoint is a Standalone Endpoint.

User can also implement custom Endpoints, see Custom Endpoints guide.

Link to this section Summary

Types

Endpoint configuration options.

Membrane action that will cause RTC Engine to forward supplied message to the business logic.

RTC Engine configuration options.

Membrane action that will cause RTC Engine to publish some message to all other endpoints.

Types of messages that can be published to other Endpoints.

Type of messages that need to be handled by each endpoint.

A message that the Engine sends to the endpoint when it ackowledges its ready_action_t/0

Membrane action that will mark the peer endpoint as ready and set its metadata. The Engine will respond with ready_ack_msg_t/0 to acknowledge your transition to ready state.

Subscription options.

Membrane action that will inform RTC Engine about track readiness.

Functions

Adds endpoint to the RTC Engine

Returns child specification for spawning under a supervisor

Returns list of the RTC Engine's endpoints.

Sends message to RTC Engine endpoint. If endpoint doesn't exist message is ignored

Registers process with pid who for receiving messages from RTC Engine

Removes endpoint from the RTC Engine

Subscribes an endpoint for a track.

Changes pipeline's playback to :stopped and terminates its process.

Unregisters process with pid who from receiving messages from RTC Engine

Link to this section Types

@type endpoint_options_t() :: [
  endpoint_id: String.t(),
  peer_id: String.t(),
  node: node()
]

Endpoint configuration options.

  • peer_id - associate endpoint with existing peer
  • endpoint_id - assign endpoint id. If not provided it will be generated by RTC Engine. This option cannot be used together with peer_id. Endpoints associated with peers have the id peer_id.
  • node - node on which endpoint should be spawned. If not provided, current node is used.
Link to this type

forward_to_parent_action_t()

View Source
@type forward_to_parent_action_t() ::
  {:notify_parent, {:forward_to_parent, message :: any()}}

Membrane action that will cause RTC Engine to forward supplied message to the business logic.

@type options_t() :: [
  id: String.t(),
  trace_ctx: map(),
  telemetry_label: Membrane.TelemetryMetrics.label(),
  display_manager?: boolean(),
  toilet_capacity: pos_integer() | nil
]

RTC Engine configuration options.

  • id is used by logger. If not provided it will be generated.
  • trace_ctx is used by OpenTelemetry. All traces from this engine will be attached to this context. Example function from which you can get Otel Context is get_current/0 from OpenTelemetry.Ctx.
  • display_manager? - set to true if you want to limit number of tracks sent from Membrane.RTC.Engine.Endpoint.WebRTC to a browser.
  • toilet_capacity - sets capacity of buffer between engine and endpoints. Use it when you expect bursts of data for your tracks. If not provided it will be set to 200.
@type publish_action_t() :: {:notify_parent, {:publish, publish_message_t()}}

Membrane action that will cause RTC Engine to publish some message to all other endpoints.

@type publish_message_t() ::
  {:new_tracks, [Membrane.RTC.Engine.Track.t()]}
  | {:removed_tracks, [Membrane.RTC.Engine.Track.t()]}
  | {:track_metadata_updated, metadata :: any()}
  | {:peer_metadata_updated, metadata :: any()}
  | {:tracks_priority, tracks :: list()}
  | Membrane.RTC.Engine.Notifications.TrackNotification.t()

Types of messages that can be published to other Endpoints.

@type published_message_t() ::
  {:new_tracks, [Membrane.RTC.Engine.Track.t()]}
  | {:removed_tracks, [Membrane.RTC.Engine.Track.t()]}
  | {:new_peer, Membrane.RTC.Engine.Peer.t()}
  | {:peer_left, Membrane.RTC.Engine.Peer.id()}
  | {:track_metadata_updated, Membrane.RTC.Engine.Track.t()}
  | {:peer_metadata_updated, Membrane.RTC.Engine.Peer.t()}
  | {:tracks_priority, tracks :: list()}
  | ready_ack_msg_t()
  | Membrane.RTC.Engine.Notifications.TrackNotification.t()

Type of messages that need to be handled by each endpoint.

@type ready_ack_msg_t() ::
  {:ready,
   peers_in_room :: [
     %{
       id: Membrane.RTC.Engine.Peer.id(),
       metadata: any(),
       trackIdToMetadata: %{required(Membrane.RTC.Engine.Track.id()) => any()}
     }
   ]}

A message that the Engine sends to the endpoint when it ackowledges its ready_action_t/0

@type ready_action_t() :: {:notify, {:ready, metadata :: any()}}

Membrane action that will mark the peer endpoint as ready and set its metadata. The Engine will respond with ready_ack_msg_t/0 to acknowledge your transition to ready state.

This action can only be used once, any further calls by an endpoint will be ignored. It will also be ignored for non-peer endpoints.

@type subscription_opts_t() :: Keyword.t()

Subscription options.

Link to this type

track_ready_action_t()

View Source
@type track_ready_action_t() ::
  {:notify_parent,
   {:track_ready, Membrane.RTC.Engine.Track.id(),
    Membrane.RTC.Engine.Track.encoding(), Membrane.RTC.Engine.Track.variant()}}

Membrane action that will inform RTC Engine about track readiness.

Link to this section Functions

Link to this function

add_endpoint(pid, endpoint, opts \\ [])

View Source
@spec add_endpoint(
  pid :: pid(),
  endpoint :: Membrane.ChildrenSpec.child_definition_t(),
  opts :: endpoint_options_t()
) :: :ok | :error

Adds endpoint to the RTC Engine

Returns :error when there are both peer_id and endpoint_id specified in opts. For more information refer to endpoint_options_t/0.

Returns child specification for spawning under a supervisor

Link to this function

get_endpoints(rtc_engine)

View Source
@spec get_endpoints(rtc_engine :: pid()) :: [%{id: String.t(), type: atom()}]

Returns list of the RTC Engine's endpoints.

@spec get_registry_name() :: atom()
Link to this function

message_endpoint(rtc_engine, endpoint_id, message)

View Source
@spec message_endpoint(
  rtc_engine :: pid(),
  endpoint_id :: String.t(),
  message :: any()
) :: :ok

Sends message to RTC Engine endpoint. If endpoint doesn't exist message is ignored

Link to this function

register(rtc_engine, who \\ self())

View Source
@spec register(rtc_engine :: pid(), who :: pid()) :: :ok

Registers process with pid who for receiving messages from RTC Engine

Link to this function

remove_endpoint(rtc_engine, id)

View Source
@spec remove_endpoint(
  pid :: pid(),
  id :: String.t()
) :: :ok

Removes endpoint from the RTC Engine

Link to this function

start(options, process_options)

View Source
@spec start(options :: options_t(), process_options :: GenServer.options()) ::
  GenServer.on_start()
Link to this function

start_link(options, process_options)

View Source
@spec start_link(options :: options_t(), process_options :: GenServer.options()) ::
  GenServer.on_start()
Link to this function

subscribe(rtc_engine, endpoint_id, track_id, opts \\ [])

View Source
@spec subscribe(
  rtc_engine :: pid(),
  endpoint_id :: String.t(),
  track_id :: Membrane.RTC.Engine.Track.id(),
  opts :: subscription_opts_t()
) :: :ok | {:error, :timeout | :invalid_track_id}

Subscribes an endpoint for a track.

The endpoint will be notified about track readiness in Membrane.Bin.handle_pad_added/3 callback. endpoint_id is the id of the endpoint, which wants to subscribe to the track.

Link to this function

terminate(pipeline, opts \\ [])

View Source
@spec terminate(pid(), Keyword.t()) :: :ok

Changes pipeline's playback to :stopped and terminates its process.

Link to this function

unregister(rtc_engine, who \\ self())

View Source
@spec unregister(rtc_engine :: pid(), who :: pid()) :: :ok

Unregisters process with pid who from receiving messages from RTC Engine