View Source Membrane.RTC.Engine (Membrane RTC Engine v0.23.0)

RTC Engine implementation.

RTC Engine is an abstraction layer responsible for linking together different types of Endpoints. From the implementation point of view, RTC Engine is a Membrane.Pipeline.

Messages

The RTC Engine works by sending messages which notify user logic about important events. To receive RTC Engine messages you have to register your process so that RTC Engine will know where to send them. All messages RTC Engine can emit are described in Membrane.RTC.Engine.Message docs.

Registering for messages

Registration can be done using register/2 e.g.

Engine.register(rtc_engine, self())

This will register your process to receive RTC Engine messages. You can register multiple processes to receive messages from an RTC Engine instance. In such a case each message will be sent to each registered process.

Endpoints

Endpoints are Membrane.Bins able to publish their own tracks and subscribe to tracks from other Endpoints. One can think about Endpoint as an entity responsible for handling some specific task. An Endpoint can be added and removed using add_endpoint/3 and remove_endpoint/2 respectively.

There are two types of Endpoints:

  • Standalone Endpoints - they are in most cases spawned only once per RTC Engine instance and they are not associated with any client.
  • Client Endpoints - they are associated with some client. Associating Endpoint with client will send some Media Events to the Endpoint's Client Library e.g. one which indicates which tracks belong to which client.

Currently RTC Engine ships with the implementation of the following Endpoints:

  • Membrane.RTC.Engine.Endpoint.WebRTC which is responsible for establishing a connection with some WebRTC client (mainly browser) and exchanging media with it. WebRTC Endpoint is a Client Endpoint.
  • Membrane.RTC.Engine.Endpoint.HLS which is responsible for receiving media tracks from all other Endpoints and saving them to files by creating HLS playlists. HLS Endpoint is a Standalone Endpoint.
  • Membrane.RTC.Engine.Endpoint.RTSP which is responsible for connecting to a remote RTSP stream source and sending the appropriate media track to other Endpoints. RTSP Endpoint is a Standalone Endpoint.
  • Membrane.RTC.Engine.Endpoint.File which is responsible for reading track from a file, payloading it into RTP, and sending it to other Endpoints. File Endpoint is a Standalone Endpoint.
  • Membrane.RTC.Engine.Endpoint.SIP which is responsible for establishing a connection with some SIP device (e.g. phone) and exchanging media with it. SIP Endpoint is a Standalone Endpoint.
  • Membrane.RTC.Engine.Endpoint.Recording which is responsible for saving incoming tracks to pointed storages. Recording Endpoint is a Standalone Endpoint.

Each of these endpoints is available as a separate package. Refer to the appropriate package for usage examples:

User can also implement custom Endpoints, see Custom Endpoints guide.

Readiness state

Each endpoint is presumed to be initially inactive and has to declare itself ready to fully join the Engine.

Before it does, it:

  • will not receive notifications about other endpoints and their metadata
  • will not receive information about tracks
  • will not be able to publish any tracks
  • will not be able to update their metadata

When declaring itself as ready, the endpoint also has an opportunity to set their metadata. To mark the endpoint as active, it has to send the ready_action_t/0.

Example

@impl true
def handle_other({:media_event, %{type: "join", metadata: metadata}}, _context, state) do
  {{:ok, notify: {:ready, metadata}}, state}
end

@impl true
def handle_other(:ready, _context, state) do
  Membrane.Logger.debug("Succesfully activated the endpoint")
  {:ok, state}
end

Summary

Types

Membrane action that will inform RTC Engine about disabling track variant by the client.

Membrane action that will inform RTC Engine about enabling track variant by the client.

Endpoint configuration options.

Membrane action that informs engine that endpoint finished processing and should be removed.

Membrane action that will cause RTC Engine to forward supplied message to the business logic.

RTC Engine configuration options.

Membrane action that will cause RTC Engine to publish some message to all other endpoints.

Types of messages that can be published to other Endpoints.

Type of messages that need to be handled by each endpoint.

A message that the Engine sends to the endpoint when it ackowledges its ready_action_t/0

Membrane action that will mark the endpoint as ready and set its metadata. The Engine will respond with ready_ack_msg_t/0 to acknowledge your transition to ready state.

Subscription options.

Membrane action that will inform RTC Engine about track readiness.

Functions

Adds endpoint to the RTC Engine

Returns child specification for spawning under a supervisor

Returns list of the RTC Engine's endpoints.

Returns number of forwarded tracks in RTC Engine.

Returns list of the RTC Engine's tracks.

Sends message to RTC Engine endpoint. If endpoint doesn't exist message is ignored

Registers process with pid who for receiving messages from RTC Engine

Removes endpoint from the RTC Engine

Subscribes an endpoint for a track.

Subscribes an endpoint for a track asynchronously.

Terminates the engine.

Unregisters process with pid who from receiving messages from RTC Engine

Types

Link to this type

disable_track_variant_action_t()

View Source
@type disable_track_variant_action_t() ::
  {:notify_parent,
   {:disable_track_variant, Membrane.RTC.Engine.Track.id(),
    Membrane.RTC.Engine.Track.variant()}}

Membrane action that will inform RTC Engine about disabling track variant by the client.

Link to this type

enable_track_variant_action_t()

View Source
@type enable_track_variant_action_t() ::
  {:notify_parent,
   {:enable_track_variant, Membrane.RTC.Engine.Track.id(),
    Membrane.RTC.Engine.Track.variant()}}

Membrane action that will inform RTC Engine about enabling track variant by the client.

@type endpoint_options_t() :: [id: String.t(), node: node()]

Endpoint configuration options.

  • id - assigned endpoint id. If not provided it will be generated by RTC Engine.
  • node - node on which endpoint should be spawned. If not provided, current node is used.
@type finished_action_t() :: {:notify_parent, :finished}

Membrane action that informs engine that endpoint finished processing and should be removed.

Link to this type

forward_to_parent_action_t()

View Source
@type forward_to_parent_action_t() ::
  {:notify_parent, {:forward_to_parent, message :: any()}}

Membrane action that will cause RTC Engine to forward supplied message to the business logic.

@type options_t() :: [
  id: String.t(),
  trace_ctx: map(),
  display_manager?: boolean(),
  toilet_capacity: pos_integer() | nil
]

RTC Engine configuration options.

  • id is used by logger. If not provided it will be generated.
  • display_manager? - set to true if you want to limit number of tracks sent from Membrane.RTC.Engine.Endpoint.WebRTC to a browser.
  • toilet_capacity - sets capacity of buffer between engine and endpoints. Use it when you expect bursts of data for your tracks. If not provided it will be set to 200.
@type publish_action_t() :: {:notify_parent, {:publish, publish_message_t()}}

Membrane action that will cause RTC Engine to publish some message to all other endpoints.

@type publish_message_t() ::
  {:new_tracks, [Membrane.RTC.Engine.Track.t()]}
  | {:removed_tracks, [Membrane.RTC.Engine.Track.t()]}
  | {:track_metadata_updated, metadata :: any()}
  | {:enable_track_variant, Membrane.RTC.Engine.Track.id(),
     Membrane.RTC.Engine.Track.variant()}
  | {:disable_track_variant, Membrane.RTC.Engine.Track.id(),
     Membrane.RTC.Engine.Track.variant()}
  | {:endpoint_metadata_updated, metadata :: any()}
  | {:tracks_priority, tracks :: list()}
  | Membrane.RTC.Engine.Notifications.TrackNotification.t()

Types of messages that can be published to other Endpoints.

@type published_message_t() ::
  {:new_tracks, [Membrane.RTC.Engine.Track.t()]}
  | {:removed_tracks, [Membrane.RTC.Engine.Track.t()]}
  | {:new_endpoint, Membrane.RTC.Engine.Endpoint.t()}
  | {:endpoint_removed, Membrane.RTC.Engine.Endpoint.id()}
  | {:track_metadata_updated, Membrane.RTC.Engine.Track.t()}
  | {:track_variant_enabled, Membrane.RTC.Engine.Track.t(),
     Membrane.RTC.Engine.Track.variant()}
  | {:track_variant_disabled, Membrane.RTC.Engine.Track.t(),
     Membrane.RTC.Engine.Track.variant()}
  | {:endpoint_metadata_updated, Membrane.RTC.Engine.Endpoint.t()}
  | {:tracks_priority, tracks :: list()}
  | ready_ack_msg_t()
  | Membrane.RTC.Engine.Notifications.TrackNotification.t()

Type of messages that need to be handled by each endpoint.

@type ready_ack_msg_t() ::
  {:ready, other_endpoints :: [Membrane.RTC.Engine.Endpoint.t()]}

A message that the Engine sends to the endpoint when it ackowledges its ready_action_t/0

@type ready_action_t() :: {:notify_parent, :ready | {:ready, metadata :: any()}}

Membrane action that will mark the endpoint as ready and set its metadata. The Engine will respond with ready_ack_msg_t/0 to acknowledge your transition to ready state.

This action can only be used once, any further calls by an endpoint will be ignored.

@type subscription_opts_t() :: Keyword.t()

Subscription options.

Link to this type

track_ready_action_t()

View Source
@type track_ready_action_t() ::
  {:notify_parent,
   {:track_ready, Membrane.RTC.Engine.Track.id(),
    Membrane.RTC.Engine.Track.variant(), Membrane.RTC.Engine.Track.encoding()}}

Membrane action that will inform RTC Engine about track readiness.

Functions

Link to this function

add_endpoint(pid, endpoint, opts \\ [])

View Source
@spec add_endpoint(
  pid :: pid(),
  endpoint :: Membrane.ChildrenSpec.child_definition(),
  opts :: endpoint_options_t()
) :: :ok

Adds endpoint to the RTC Engine

For more information refer to endpoint_options_t/0.

Returns child specification for spawning under a supervisor

Link to this function

get_endpoints(rtc_engine)

View Source
@spec get_endpoints(rtc_engine :: pid()) :: [%{id: String.t(), type: atom()}]

Returns list of the RTC Engine's endpoints.

Link to this function

get_num_forwarded_tracks(rtc_engine)

View Source
@spec get_num_forwarded_tracks(rtc_engine :: pid()) :: integer()

Returns number of forwarded tracks in RTC Engine.

It is number of active and pending subscriptions.

@spec get_registry_name() :: atom()
@spec get_tracks(rtc_engine :: pid()) :: [Membrane.RTC.Engine.Track.t()]

Returns list of the RTC Engine's tracks.

Link to this function

message_endpoint(rtc_engine, endpoint_id, message)

View Source
@spec message_endpoint(
  rtc_engine :: pid(),
  endpoint_id :: String.t(),
  message :: any()
) :: :ok

Sends message to RTC Engine endpoint. If endpoint doesn't exist message is ignored

Link to this function

register(rtc_engine, who \\ self())

View Source
@spec register(rtc_engine :: pid(), who :: pid()) :: :ok

Registers process with pid who for receiving messages from RTC Engine

Link to this function

remove_endpoint(rtc_engine, id)

View Source
@spec remove_endpoint(
  pid :: pid(),
  id :: String.t()
) :: :ok

Removes endpoint from the RTC Engine

Link to this function

start(options, process_options)

View Source
@spec start(options :: options_t(), process_options :: GenServer.options()) ::
  GenServer.on_start()
Link to this function

start_link(options, process_options)

View Source
@spec start_link(options :: options_t(), process_options :: GenServer.options()) ::
  GenServer.on_start()
Link to this function

subscribe(rtc_engine, endpoint_id, track_id, opts \\ [])

View Source
@spec subscribe(
  rtc_engine :: pid(),
  endpoint_id :: String.t(),
  track_id :: Membrane.RTC.Engine.Track.id(),
  opts :: subscription_opts_t()
) :: :ok | :ignored

Subscribes an endpoint for a track.

The endpoint will be notified about track readiness in Membrane.Bin.handle_pad_added/3 callback. endpoint_id is the id of the endpoint, which wants to subscribe to the track. Possible return values are:

  • :ok - when endpoint subscribed on track successfully
  • :ignored - when subscribing was impossible because the state of the engine changed e.g: the track was already removed, or subscribing endpoint was removed
Link to this function

subscribe_async(rtc_engine, endpoint_id, track_id, opts \\ [])

View Source
@spec subscribe_async(
  rtc_engine :: pid(),
  endpoint_id :: String.t(),
  track_id :: Membrane.RTC.Engine.Track.id(),
  opts :: subscription_opts_t()
) :: reference()

Subscribes an endpoint for a track asynchronously.

Sends a message to the calling process in the form of {:subscribe_result, subscribe_ref, result} after the subscription is completed, where the subscribe_ref is the reference returned upon subscription and result is the same as returned by subscribe/3.

Link to this function

terminate(engine, opts \\ [])

View Source
@spec terminate(pid(),
  timeout: timeout(),
  force?: boolean(),
  asynchronous?: boolean()
) ::
  :ok | {:ok, pid()} | {:error, :timeout}

Terminates the engine.

Accepts three options:

  • asynchronous? - if set to true, pipline termination won't be blocking and will be executed in the process, which pid is returned as function result. If set to false, engine termination will be blocking and will be executed in the process that called this function. Defaults to false.
  • timeout - tells how much time (ms) to wait for engine to get gracefully terminated. Defaults to 5000.
  • force? - if set to true and engine is still alive after timeout, engine will be killed using Process.exit/2 with reason :kill, and function will return {:error, :timeout}. If set to false and engine is still alive after timeout, function will raise an error. Defaults to false.

Returns:

  • {:ok, pid} - if option asynchronous?: true was passed.
  • :ok - if engine was gracefully terminated within timeout.
  • {:error, :timeout} - if engine was killed after a timeout.
Link to this function

unregister(rtc_engine, who \\ self())

View Source
@spec unregister(rtc_engine :: pid(), who :: pid()) :: :ok

Unregisters process with pid who from receiving messages from RTC Engine