View Source Membrane.Pipeline behaviour (Membrane Core v0.11.0)

Module containing functions for constructing and supervising pipelines.

Pipelines are units that make it possible to instantiate, link and manage elements and bins in convenient way (actually they should always be used inside a pipeline). Linking pipeline children together enables them to pass data to one another, and process it in different ways.

To create a pipeline, use the __using__/1 macro and implement callbacks of Membrane.Pipeline behaviour. For details on instantiating and linking children, see Membrane.ChildrenSpec.

starting-and-supervision

Starting and supervision

Pipeline can be started with start_link/2 or start/2 functions. They both return {:ok, supervisor_pid, pipeline_pid} in case of success, because the pipeline is always spawned under a dedicated supervisor. The supervisor never restarts the pipeline, but it makes sure that the pipeline and its children terminate properly. If the pipeline needs to be restarted in case of failure, it should be spawned under another supervisor with a proper strategy.

starting-under-a-supervision-tree

Starting under a supervision tree

The pipeline can be spawned under a supervision tree like any GenServer. Also, __using__/1 macro injects a child_spec/1 function. A simple scenario can look like:

defmodule MyPipeline do
  use Membrane.Pipeline

  def start_link(options) do
    Membrane.Pipeline.start_link(options, name: MyPipeline)
  end

  # ...
end

Supervisor.start_link([{MyPipeline, option: :value}], strategy: :one_for_one)
send(MyPipeline, :message)

starting-outside-of-a-supervision-tree

Starting outside of a supervision tree

When starting a pipeline outside a supervision tree and willing to interact with the pipeline by pid, pipeline_pid returned from start_link can be used, for example

{:ok, _supervisor_pid, pipeline_pid} = Membrane.Pipeline.start_link(MyPipeline, option: :value)
send(pipeline_pid, :message)

visualizing-pipeline-s-supervision-tree

Visualizing pipeline's supervision tree

Pipeline's internal supervision tree can be looked up with Applications tab of Erlang's Observer or with Livebook's Kino library. For debugging (and ONLY for debugging) purposes, you may use the following configuration:

  config :membrane_core, unsafely_name_processes_for_observer: [:components]

that makes the observer's process tree graph more readable by naming pipeline's descendants, for example: Observer graph.

Link to this section Summary

Types

Defines return values from Pipeline callback functions.

Defines options that can be passed to start/3 / start_link/3 and received in handle_init/2 callback.

Callbacks

Callback invoked when pipeline is called using a synchronous call.

Callback invoked when a notification comes in from an element.

Callback invoked when crash of the crash group happens.

Callback invoked when a child element finishes processing stream via given pad.

Callback invoked when a child element starts processing stream via given pad.

Callback invoked when pipeline receives a message that is not recognized as an internal membrane message.

Callback invoked on initialization of pipeline.

Callback invoked when pipeline switches the playback to :playing.

Callback invoked on pipeline startup, right after handle_init/2.

Callback invoked when children of Membrane.ChildrenSpec are started.

Callback invoked when pipeline is requested to terminate with terminate/2.

Callback invoked upon each timer tick. A timer can be started with Membrane.Pipeline.Action.start_timer_t action.

Functions

Brings all the stuff necessary to implement a pipeline.

Checks whether module is a pipeline.

Does the same as start_link/3 but starts process outside of supervision tree.

Starts the Pipeline based on given module and links it to the current process.

Gracefully terminates the pipeline.

Link to this section Types

@type callback_return_t() :: {[Membrane.Pipeline.Action.t()], state()}

Defines return values from Pipeline callback functions.

return-values

Return values

  • {[action], state} - Return a list of actions that will be performed within the pipeline. This can be used to start new children, or to send messages to specific children, for example. Actions are a tuple of {type, arguments}, so may be written in the form a keyword list. See Membrane.Pipeline.Action for more info.
@type config() :: [config_entry()]
@type config_entry() :: {:name, name()}
@type name() :: GenServer.name()
@type on_start() ::
  {:ok, supervisor_pid :: pid(), pipeline_pid :: pid()}
  | {:error, {:already_started, pid()} | term()}
@type pipeline_options() :: any()

Defines options that can be passed to start/3 / start_link/3 and received in handle_init/2 callback.

@type state() :: any()

Link to this section Callbacks

Link to this callback

handle_call(message, context, state)

View Source (optional)
@callback handle_call(
  message :: any(),
  context :: Membrane.Pipeline.CallbackContext.Call.t(),
  state()
) :: callback_return_t()

Callback invoked when pipeline is called using a synchronous call.

Link to this callback

handle_child_notification(notification, element, context, state)

View Source (optional)
@callback handle_child_notification(
  notification :: Membrane.ChildNotification.t(),
  element :: Membrane.Child.name_t(),
  context :: Membrane.Pipeline.CallbackContext.ChildNotification.t(),
  state()
) :: callback_return_t()

Callback invoked when a notification comes in from an element.

Link to this callback

handle_crash_group_down(group_name, context, state)

View Source (optional)
@callback handle_crash_group_down(
  group_name :: Membrane.CrashGroup.name_t(),
  context :: Membrane.Pipeline.CallbackContext.CrashGroupDown.t(),
  state()
) :: callback_return_t()

Callback invoked when crash of the crash group happens.

Link to this callback

handle_element_end_of_stream(child, pad, context, state)

View Source (optional)
@callback handle_element_end_of_stream(
  child :: Membrane.Child.name_t(),
  pad :: Membrane.Pad.ref_t(),
  context :: Membrane.Pipeline.CallbackContext.StreamManagement.t(),
  state()
) :: callback_return_t()

Callback invoked when a child element finishes processing stream via given pad.

Link to this callback

handle_element_start_of_stream(child, pad, context, state)

View Source (optional)
@callback handle_element_start_of_stream(
  child :: Membrane.Child.name_t(),
  pad :: Membrane.Pad.ref_t(),
  context :: Membrane.Pipeline.CallbackContext.StreamManagement.t(),
  state()
) :: callback_return_t()

Callback invoked when a child element starts processing stream via given pad.

Link to this callback

handle_info(message, context, state)

View Source (optional)
@callback handle_info(
  message :: any(),
  context :: Membrane.Pipeline.CallbackContext.Info.t(),
  state()
) :: callback_return_t()

Callback invoked when pipeline receives a message that is not recognized as an internal membrane message.

Useful for receiving data sent from NIFs or other stuff.

Link to this callback

handle_init(context, options)

View Source (optional)
@callback handle_init(
  context :: Membrane.Pipeline.CallbackContext.Init.t(),
  options :: pipeline_options()
) ::
  callback_return_t()

Callback invoked on initialization of pipeline.

This callback is synchronous: the process which started the pipeline waits until handle_init finishes. For that reason, it's important to do any long-lasting or complex work in handle_setup/2, while handle_init should be used for things like parsing options, initializing state or spawning children.

Link to this callback

handle_playing(context, state)

View Source (optional)
@callback handle_playing(
  context :: Membrane.Pipeline.CallbackContext.Playing.t(),
  state()
) :: callback_return_t()

Callback invoked when pipeline switches the playback to :playing.

Link to this callback

handle_setup(context, state)

View Source (optional)
@callback handle_setup(
  context :: Membrane.Pipeline.CallbackContext.Setup.t(),
  state()
) :: callback_return_t()

Callback invoked on pipeline startup, right after handle_init/2.

Any long-lasting or complex initialization should happen here.

Link to this callback

handle_spec_started(children, context, state)

View Source (optional)
@callback handle_spec_started(
  children :: [Membrane.Child.name_t()],
  context :: Membrane.Pipeline.CallbackContext.SpecStarted.t(),
  state()
) :: callback_return_t()

Callback invoked when children of Membrane.ChildrenSpec are started.

Link to this callback

handle_terminate_request(context, state)

View Source (optional)
@callback handle_terminate_request(
  context :: Membrane.Pipeline.CallbackContext.TerminateRequest.t(),
  state()
) :: callback_return_t()

Callback invoked when pipeline is requested to terminate with terminate/2.

By default it returns Membrane.Pipeline.Action.terminate_t/0 with reason :normal.

Link to this callback

handle_tick(timer_id, context, state)

View Source (optional)
@callback handle_tick(
  timer_id :: any(),
  context :: Membrane.Pipeline.CallbackContext.Tick.t(),
  state()
) :: callback_return_t()

Callback invoked upon each timer tick. A timer can be started with Membrane.Pipeline.Action.start_timer_t action.

Link to this section Functions

Link to this macro

__using__(options)

View Source (macro)

Brings all the stuff necessary to implement a pipeline.

Options:

Link to this function

call(pipeline, message, timeout \\ 5000)

View Source
@spec call(pid(), any(), integer()) :: :ok
@spec pipeline?(module()) :: boolean()

Checks whether module is a pipeline.

Link to this function

start(module, pipeline_options \\ nil, process_options \\ [])

View Source
@spec start(module(), pipeline_options(), config()) :: on_start()

Does the same as start_link/3 but starts process outside of supervision tree.

Link to this function

start_link(module, pipeline_options \\ nil, process_options \\ [])

View Source
@spec start_link(module(), pipeline_options(), config()) :: on_start()

Starts the Pipeline based on given module and links it to the current process.

Pipeline options are passed to module's handle_init/2 callback. Note that this function returns {:ok, supervisor_pid, pipeline_pid} in case of success. Check the 'Starting and supervision' section of the moduledoc for details.

Link to this function

terminate(pipeline, opts \\ [])

View Source
@spec terminate(pipeline :: pid(), Keyword.t()) :: :ok | {:error, :timeout}

Gracefully terminates the pipeline.

Accepts two options:

  • blocking? - tells whether to stop the pipeline synchronously
  • timeout - if blocking? is set to true it tells how much time (ms) to wait for pipeline to get terminated. Defaults to 5000.