View Source Membrane.Pipeline behaviour (Membrane Core v1.0.1)

Module containing functions for constructing and supervising pipelines.

Pipelines are units that make it possible to instantiate, link and manage elements and bins in convenient way (actually they should always be used inside a pipeline). Linking pipeline children together enables them to pass data to one another, and process it in different ways.

To create a pipeline, use the __using__/1 macro and implement callbacks of Membrane.Pipeline behaviour. For details on instantiating and linking children, see Membrane.ChildrenSpec.

Starting and supervision

Pipeline can be started with start_link/2 or start/2 functions. They both return {:ok, supervisor_pid, pipeline_pid} in case of success, because the pipeline is always spawned under a dedicated supervisor. The supervisor never restarts the pipeline, but it makes sure that the pipeline and its children terminate properly. If the pipeline needs to be restarted in case of failure, it should be spawned under another supervisor with a proper strategy.

Starting under a supervision tree

The pipeline can be spawned under a supervision tree like any GenServer. Also, __using__/1 macro injects a child_spec/1 function. A simple scenario can look like:

defmodule MyPipeline do
  use Membrane.Pipeline

  def start_link(options) do
    Membrane.Pipeline.start_link(__MODULE__, options, name: MyPipeline)
  end

  # ...
end

Supervisor.start_link([{MyPipeline, option: :value}], strategy: :one_for_one)
send(MyPipeline, :message)

Starting outside of a supervision tree

When starting a pipeline outside a supervision tree and willing to interact with the pipeline by pid, pipeline_pid returned from start_link can be used, for example

{:ok, _supervisor_pid, pipeline_pid} = Membrane.Pipeline.start_link(MyPipeline, option: :value)
send(pipeline_pid, :message)

Visualizing pipeline's supervision tree

Pipeline's internal supervision tree can be looked up with Applications tab of Erlang's Stalker or with Livebook's Kino library. For debugging (and ONLY for debugging) purposes, you may use the following configuration:

  config :membrane_core, unsafely_name_processes_for_stalker: [:components]

that makes the stalker's process tree graph more readable by naming pipeline's descendants, for example: Stalker graph.

Summary

Types

Defines return values from Pipeline callback functions.

Defines options that can be passed to start/3 / start_link/3 and received in handle_init/2 callback.

Callbacks

Callback invoked when pipeline is called using a synchronous call.

Callback invoked when a notification comes in from a child.

Callback invoked when a child removes its pad.

Callback invoked when crash of the crash group happens.

Callback invoked when a child element finishes processing stream via given pad.

Callback invoked when a child element starts processing stream via given pad.

Callback invoked when pipeline receives a message that is not recognized as an internal membrane message.

Callback invoked on initialization of pipeline.

Callback invoked when pipeline switches the playback to :playing. By default, it does nothing.

Callback invoked on pipeline startup, right after handle_init/2.

Callback invoked when children of Membrane.ChildrenSpec are started.

Callback invoked when pipeline is requested to terminate with terminate/2.

Callback invoked upon each timer tick. A timer can be started with Membrane.Pipeline.Action.start_timer action.

Functions

Brings all the stuff necessary to implement a pipeline.

Lists PIDs of all the pipelines currently running on the current node.

Like list_pipelines/0, but allows to pass a node.

Checks whether module is a pipeline.

Does the same as start_link/3 but starts process outside of supervision tree.

Starts the Pipeline based on given module and links it to the current process.

Terminates the pipeline.

Types

@type callback_return() :: {[Membrane.Pipeline.Action.t()], state()}

Defines return values from Pipeline callback functions.

Return values

  • {[action], state} - Return a list of actions that will be performed within the pipeline. This can be used to start new children, or to send messages to specific children, for example. Actions are a tuple of {type, arguments}, so may be written in the form a keyword list. See Membrane.Pipeline.Action for more info.
@type config() :: [config_entry()]
@type config_entry() :: {:name, name()}
@type name() :: GenServer.name()
@type on_start() ::
  {:ok, supervisor_pid :: pid(), pipeline_pid :: pid()}
  | {:error, {:already_started, pid()} | term()}
@type pipeline_options() :: any()

Defines options that can be passed to start/3 / start_link/3 and received in handle_init/2 callback.

@type state() :: any()

Callbacks

Link to this callback

handle_call(message, context, state)

View Source (optional)
@callback handle_call(
  message :: any(),
  context :: Membrane.Pipeline.CallbackContext.t(),
  state()
) ::
  {[
     Membrane.Pipeline.Action.common_actions()
     | Membrane.Pipeline.Action.reply()
   ], state()}

Callback invoked when pipeline is called using a synchronous call.

Context passed to this callback contains additional field :from. By default, it does nothing.

Link to this callback

handle_child_notification(notification, element, context, state)

View Source (optional)
@callback handle_child_notification(
  notification :: Membrane.ChildNotification.t(),
  element :: Membrane.Child.name(),
  context :: Membrane.Pipeline.CallbackContext.t(),
  state()
) :: {[Membrane.Pipeline.Action.common_actions()], state()}

Callback invoked when a notification comes in from a child.

By default, it ignores the notification.

Link to this callback

handle_child_pad_removed(child, pad, context, state)

View Source (optional)
@callback handle_child_pad_removed(
  child :: Membrane.Child.name(),
  pad :: Membrane.Pad.ref(),
  context :: Membrane.Pipeline.CallbackContext.t(),
  state :: state()
) :: {[Membrane.Pipeline.Action.common_actions()], state()}

Callback invoked when a child removes its pad.

The callback won't be invoked, when you have initiated the pad removal, eg. when you have returned t:Membrane.Pipeline.Action.remove_link() action which made one of your children's pads be removed. By default, it does nothing.

Link to this callback

handle_crash_group_down(group_name, context, state)

View Source (optional)
@callback handle_crash_group_down(
  group_name :: Membrane.Child.group(),
  context :: Membrane.Pipeline.CallbackContext.t(),
  state()
) :: {[Membrane.Pipeline.Action.common_actions()], state()}

Callback invoked when crash of the crash group happens.

Context passed to this callback contains 2 additional fields: :members and :crash_initiator. By default, it does nothing.

Link to this callback

handle_element_end_of_stream(child, pad, context, state)

View Source (optional)
@callback handle_element_end_of_stream(
  child :: Membrane.Child.name(),
  pad :: Membrane.Pad.ref(),
  context :: Membrane.Pipeline.CallbackContext.t(),
  state()
) :: {[Membrane.Pipeline.Action.common_actions()], state()}

Callback invoked when a child element finishes processing stream via given pad.

By default, it does nothing.

Link to this callback

handle_element_start_of_stream(child, pad, context, state)

View Source (optional)
@callback handle_element_start_of_stream(
  child :: Membrane.Child.name(),
  pad :: Membrane.Pad.ref(),
  context :: Membrane.Pipeline.CallbackContext.t(),
  state()
) :: {[Membrane.Pipeline.Action.common_actions()], state()}

Callback invoked when a child element starts processing stream via given pad.

By default, it does nothing.

Link to this callback

handle_info(message, context, state)

View Source (optional)
@callback handle_info(
  message :: any(),
  context :: Membrane.Pipeline.CallbackContext.t(),
  state()
) :: {[Membrane.Pipeline.Action.common_actions()], state()}

Callback invoked when pipeline receives a message that is not recognized as an internal membrane message.

Useful for receiving data sent from NIFs or other stuff. By default, it logs and ignores the received message.

Link to this callback

handle_init(context, options)

View Source (optional)
@callback handle_init(
  context :: Membrane.Pipeline.CallbackContext.t(),
  options :: pipeline_options()
) ::
  {[Membrane.Pipeline.Action.common_actions()], state()}

Callback invoked on initialization of pipeline.

This callback is synchronous: the process which started the pipeline waits until handle_init finishes. For that reason, it's important to do any long-lasting or complex work in handle_setup/2, while handle_init should be used for things like parsing options, initializing state or spawning children. By default, it converts the opts to a map if they're a struct and sets them as the pipeline state.

Link to this callback

handle_playing(context, state)

View Source (optional)
@callback handle_playing(
  context :: Membrane.Pipeline.CallbackContext.t(),
  state()
) :: {[Membrane.Pipeline.Action.common_actions()], state()}

Callback invoked when pipeline switches the playback to :playing. By default, it does nothing.

Link to this callback

handle_setup(context, state)

View Source (optional)
@callback handle_setup(
  context :: Membrane.Pipeline.CallbackContext.t(),
  state()
) :: {[Membrane.Pipeline.Action.common_actions()], state()}

Callback invoked on pipeline startup, right after handle_init/2.

Any long-lasting or complex initialization should happen here. By default, it does nothing.

Link to this callback

handle_spec_started(children, context, state)

View Source (optional)
@callback handle_spec_started(
  children :: [Membrane.Child.name()],
  context :: Membrane.Pipeline.CallbackContext.t(),
  state()
) :: {[Membrane.Pipeline.Action.common_actions()], state()}

Callback invoked when children of Membrane.ChildrenSpec are started.

By default, it does nothing.

Link to this callback

handle_terminate_request(context, state)

View Source (optional)
@callback handle_terminate_request(
  context :: Membrane.Pipeline.CallbackContext.t(),
  state()
) ::
  {[Membrane.Pipeline.Action.common_actions()], state()}

Callback invoked when pipeline is requested to terminate with terminate/2.

By default, it returns Membrane.Pipeline.Action.terminate/0 with reason :normal.

Link to this callback

handle_tick(timer_id, context, state)

View Source (optional)
@callback handle_tick(
  timer_id :: any(),
  context :: Membrane.Pipeline.CallbackContext.t(),
  state()
) :: {[Membrane.Pipeline.Action.common_actions()], state()}

Callback invoked upon each timer tick. A timer can be started with Membrane.Pipeline.Action.start_timer action.

Functions

Link to this macro

__using__(options)

View Source (macro)

Brings all the stuff necessary to implement a pipeline.

Options:

Link to this function

call(pipeline, message, timeout \\ 5000)

View Source
@spec call(pid(), any(), timeout()) :: term()
@spec list_pipelines() :: [pid()]

Lists PIDs of all the pipelines currently running on the current node.

Use only for debugging purposes.

@spec list_pipelines(node()) :: [pid()]

Like list_pipelines/0, but allows to pass a node.

@spec pipeline?(module()) :: boolean()

Checks whether module is a pipeline.

Link to this function

start(module, pipeline_options \\ nil, process_options \\ [])

View Source
@spec start(module(), pipeline_options(), config()) :: on_start()

Does the same as start_link/3 but starts process outside of supervision tree.

Link to this function

start_link(module, pipeline_options \\ nil, process_options \\ [])

View Source
@spec start_link(module(), pipeline_options(), config()) :: on_start()

Starts the Pipeline based on given module and links it to the current process.

Pipeline options are passed to module's handle_init/2 callback. Note that this function returns {:ok, supervisor_pid, pipeline_pid} in case of success. Check the 'Starting and supervision' section of the moduledoc for details.

Link to this function

terminate(pipeline, opts \\ [])

View Source
@spec terminate(pipeline :: pid(),
  timeout: timeout(),
  force?: boolean(),
  asynchronous?: boolean()
) ::
  :ok | {:ok, pid()} | {:error, :timeout}

Terminates the pipeline.

Accepts three options:

  • asynchronous? - if set to true, pipline termination won't be blocking and will be executed in the process, which pid is returned as function result. If set to false, pipeline termination will be blocking and will be executed in the process that called this function. Defaults to false.
  • timeout - tells how much time (ms) to wait for pipeline to get gracefully terminated. Defaults to 5000.
  • force? - if set to true and pipeline is still alive after timeout, pipeline will be killed using Process.exit/2 with reason :kill, and function will return {:error, :timeout}. If set to false and pipeline is still alive after timeout, function will raise an error. Defaults to false.

Returns:

  • {:ok, pid} - if option asynchronous?: true was passed.
  • :ok - if pipeline was gracefully terminated within timeout.
  • {:error, :timeout} - if pipeline was killed after a timeout.