RTC Engine implementation.

RTC Engine is an abstraction layer responsible for linking together different types of Endpoints. From the implementation point of view, RTC Engine is a Membrane.Pipeline.


The RTC Engine works by sending messages which notify user logic about important events. To receive RTC Engine messages you have to register your process so that RTC Engine will know where to send them. All messages RTC Engine can emit are described in Membrane.RTC.Engine.Message docs.

Registering for messages

Registration can be done using register/2 e.g.

Engine.register(rtc_engine, self())

This will register your process to receive RTC Engine messages. You can register multiple processes to receive messages from an RTC Engine instance. In such a case each message will be sent to each registered process.


Endpoints are Membrane.Bins able to publish their own tracks and subscribe to tracks from other Endpoints. One can think about Endpoint as an entity responsible for handling some specific task. An Endpoint can be added and removed using add_endpoint/3 and remove_endpoint/2 respectively.

There are two types of Endpoints:

  • Standalone Endpoints - they are in most cases spawned only once per RTC Engine instance and they are not associated with any client.
  • Client Endpoints - they are associated with some client. Associating Endpoint with client will send some Media Events to the Endpoint's Client Library e.g. one which indicates which tracks belong to which client.

Currently RTC Engine ships with the implementation of the following Endpoints:

  • Membrane.RTC.Engine.Endpoint.WebRTC which is responsible for establishing a connection with some WebRTC client (mainly browser) and exchanging media with it. WebRTC Endpoint is a Client Endpoint.
  • Membrane.RTC.Engine.Endpoint.HLS which is responsible for receiving media tracks from all other Endpoints and saving them to files by creating HLS playlists. HLS Endpoint is a Standalone Endpoint.
  • Membrane.RTC.Engine.Endpoint.RTSP which is responsible for connecting to a remote RTSP stream source and sending the appropriate media track to other Endpoints. RTSP Endpoint is a Standalone Endpoint.
  • Membrane.RTC.Engine.Endpoint.File which is responsible for reading track from a file, payloading it into RTP, and sending it to other Endpoints. File Endpoint is a Standalone Endpoint.
  • Membrane.RTC.Engine.Endpoint.SIP which is responsible for establishing a connection with some SIP device (e.g. phone) and exchanging media with it. SIP Endpoint is a Standalone Endpoint.
  • Membrane.RTC.Engine.Endpoint.Recording which is responsible for saving incoming tracks to pointed storages. Recording Endpoint is a Standalone Endpoint.

Each of these endpoints is available as a separate package. Refer to the appropriate package for usage examples:

User can also implement custom Endpoints, see Custom Endpoints guide.

Readiness state

Each endpoint is presumed to be initially inactive and has to declare itself ready to fully join the Engine.

Before it does, it:

  • will not receive notifications about other endpoints and their metadata
  • will not receive information about tracks
  • will not be able to publish any tracks
  • will not be able to update their metadata

When declaring itself as ready, the endpoint also has an opportunity to set their metadata. To mark the endpoint as active, it has to send the ready_action_t/0.


@impl true
def handle_other({:media_event, %{type: "join", metadata: metadata}}, _context, state) do
  {{:ok, notify: {:ready, metadata}}, state}

@impl true
def handle_other(:ready, _context, state) do
  Membrane.Logger.debug("Succesfully activated the endpoint")
  {:ok, state}



@type disable_track_variant_action_t() ::

@type enable_track_variant_action_t() ::

@type endpoint_options_t() :: [id: String.t(), node: node()]

  • id - assigned endpoint id. If not provided it will be generated by RTC Engine.
  • node - node on which endpoint should be spawned. If not provided, current node is used.
@type finished_action_t() :: {:notify_parent, :finished}

@type forward_to_parent_action_t() ::
  {:notify_parent, {:forward_to_parent, message :: any()}}

@type options_t() :: [
  id: String.t(),
  trace_ctx: map(),
  display_manager?: boolean(),
  toilet_capacity: pos_integer() | nil

  • id is used by logger. If not provided it will be generated.
  • display_manager? - set to true if you want to limit number of tracks sent from Membrane.RTC.Engine.Endpoint.WebRTC to a browser.
  • toilet_capacity - sets capacity of buffer between engine and endpoints. Use it when you expect bursts of data for your tracks. If not provided it will be set to 200.
@type publish_action_t() :: {:notify_parent, {:publish, publish_message_t()}}

@type publish_message_t() ::
  {:new_tracks, [Membrane.RTC.Engine.Track.t()]}
  | {:removed_tracks, [Membrane.RTC.Engine.Track.t()]}
  | {:track_metadata_updated, metadata :: any()}
  | {:enable_track_variant,,
  | {:disable_track_variant,,
  | {:endpoint_metadata_updated, metadata :: any()}
  | {:tracks_priority, tracks :: list()}
  | Membrane.RTC.Engine.Notifications.TrackNotification.t()

@type published_message_t() ::
  {:new_tracks, [Membrane.RTC.Engine.Track.t()]}
  | {:removed_tracks, [Membrane.RTC.Engine.Track.t()]}
  | {:new_endpoint, Membrane.RTC.Engine.Endpoint.t()}
  | {:endpoint_removed,}
  | {:track_metadata_updated, Membrane.RTC.Engine.Track.t()}
  | {:track_variant_enabled, Membrane.RTC.Engine.Track.t(),
  | {:track_variant_disabled, Membrane.RTC.Engine.Track.t(),
  | {:endpoint_metadata_updated, Membrane.RTC.Engine.Endpoint.t()}
  | {:tracks_priority, tracks :: list()}
  | ready_ack_msg_t()
  | Membrane.RTC.Engine.Notifications.TrackNotification.t()

@type ready_ack_msg_t() ::
  {:ready, other_endpoints :: [Membrane.RTC.Engine.Endpoint.t()]}

@type ready_action_t() :: {:notify_parent, :ready | {:ready, metadata :: any()}}

This action can only be used once, any further calls by an endpoint will be ignored.

@type subscription_opts_t() :: Keyword.t()

Link to this type


View Source
@type track_ready_action_t() ::
    Membrane.RTC.Engine.Track.variant(), Membrane.RTC.Engine.Track.encoding()}}

add_endpoint(pid, endpoint, opts \\ [])

@spec add_endpoint(
  pid :: pid(),
  endpoint :: Membrane.ChildrenSpec.child_definition(),
  opts :: endpoint_options_t()
) :: :ok

For more information refer to endpoint_options_t/0.

@spec get_endpoints(rtc_engine :: pid()) :: [%{id: String.t(), type: atom()}]

View Source
@spec get_num_forwarded_tracks(rtc_engine :: pid()) :: integer()

It is number of active and pending subscriptions.

@spec get_registry_name() :: atom()
@spec get_tracks(rtc_engine :: pid()) :: [Membrane.RTC.Engine.Track.t()]

message_endpoint(rtc_engine, endpoint_id, message)

@spec message_endpoint(
  rtc_engine :: pid(),
  endpoint_id :: String.t(),
  message :: any()
) :: :ok

register(rtc_engine, who \\ self())

@spec register(rtc_engine :: pid(), who :: pid()) :: :ok

remove_endpoint(rtc_engine, id)

@spec remove_endpoint(
  pid :: pid(),
  id :: String.t()
) :: :ok

start(options, process_options)

View Source
@spec start(options :: options_t(), process_options :: GenServer.options()) ::
start_link(options, process_options)

View Source
subscribe(rtc_engine, endpoint_id, track_id, opts \\ [])

View Source
@spec subscribe(
  rtc_engine :: pid(),
  endpoint_id :: String.t(),
  track_id ::,
  opts :: subscription_opts_t()
) :: :ok | :ignored

The endpoint will be notified about track readiness in Membrane.Bin.handle_pad_added/3 callback. endpoint_id is the id of the endpoint, which wants to subscribe to the track. Possible return values are:

  • :ok - when endpoint subscribed on track successfully
  • :ignored - when subscribing was impossible because the state of the engine changed e.g: the track was already removed, or subscribing endpoint was removed
subscribe_async(rtc_engine, endpoint_id, track_id, opts \\ [])

@spec subscribe_async(
  rtc_engine :: pid(),
  endpoint_id :: String.t(),
  track_id ::,
  opts :: subscription_opts_t()
) :: reference()

Sends a message to the calling process in the form of {:subscribe_result, subscribe_ref, result} after the subscription is completed, where the subscribe_ref is the reference returned upon subscription and result is the same as returned by subscribe/3.

terminate(engine, opts \\ [])

@spec terminate(pid(),
  timeout: timeout(),
  force?: boolean(),
  asynchronous?: boolean()
) ::
  :ok | {:ok, pid()} | {:error, :timeout}

Accepts three options:

  • asynchronous? - if set to true, pipline termination won't be blocking and will be executed in the process, which pid is returned as function result. If set to false, engine termination will be blocking and will be executed in the process that called this function. Defaults to false.
  • timeout - tells how much time (ms) to wait for engine to get gracefully terminated. Defaults to 5000.
  • force? - if set to true and engine is still alive after timeout, engine will be killed using Process.exit/2 with reason :kill, and function will return {:error, :timeout}. If set to false and engine is still alive after timeout, function will raise an error. Defaults to false.


  • {:ok, pid} - if option asynchronous?: true was passed.
  • :ok - if engine was gracefully terminated within timeout.
  • {:error, :timeout} - if engine was killed after a timeout.
unregister(rtc_engine, who \\ self())

@spec unregister(rtc_engine :: pid(), who :: pid()) :: :ok

