View Source Membrane.Pipeline behaviour (Membrane Core v1.0.0)
Module containing functions for constructing and supervising pipelines.
Pipelines are units that make it possible to instantiate, link and manage elements and bins in convenient way (actually they should always be used inside a pipeline). Linking pipeline children together enables them to pass data to one another, and process it in different ways.
To create a pipeline, use the __using__/1
macro and implement callbacks
of Membrane.Pipeline
behaviour. For details on instantiating and linking
children, see Membrane.ChildrenSpec
.
Starting and supervision
Pipeline can be started with start_link/2
or start/2
functions. They both
return {:ok, supervisor_pid, pipeline_pid}
in case of success, because the pipeline
is always spawned under a dedicated supervisor. The supervisor never restarts the
pipeline, but it makes sure that the pipeline and its children terminate properly.
If the pipeline needs to be restarted in case of failure, it should be spawned under
another supervisor with a proper strategy.
Starting under a supervision tree
The pipeline can be spawned under a supervision tree like any GenServer
. Also,
__using__/1
macro injects a child_spec/1
function. A simple scenario can look like:
defmodule MyPipeline do
use Membrane.Pipeline
def start_link(options) do
Membrane.Pipeline.start_link(__MODULE__, options, name: MyPipeline)
end
# ...
end
Supervisor.start_link([{MyPipeline, option: :value}], strategy: :one_for_one)
send(MyPipeline, :message)
Starting outside of a supervision tree
When starting a pipeline outside a supervision tree and willing to interact with
the pipeline by pid, pipeline_pid
returned from start_link
can be used, for example
{:ok, _supervisor_pid, pipeline_pid} = Membrane.Pipeline.start_link(MyPipeline, option: :value)
send(pipeline_pid, :message)
Visualizing pipeline's supervision tree
Pipeline's internal supervision tree can be looked up with Applications tab of Erlang's Stalker
or with Livebook's Kino
library.
For debugging (and ONLY for debugging) purposes, you may use the following configuration:
config :membrane_core, unsafely_name_processes_for_stalker: [:components]
that makes the stalker's process tree graph more readable by naming pipeline's descendants, for example: .
Summary
Types
Defines return values from Pipeline callback functions.
Defines options that can be passed to start/3
/ start_link/3
and received
in handle_init/2
callback.
Callbacks
Callback invoked when pipeline is called using a synchronous call.
Callback invoked when a notification comes in from a child.
Callback invoked when a child removes its pad.
Callback invoked when crash of the crash group happens.
Callback invoked when a child element finishes processing stream via given pad.
Callback invoked when a child element starts processing stream via given pad.
Callback invoked when pipeline receives a message that is not recognized as an internal membrane message.
Callback invoked on initialization of pipeline.
Callback invoked when pipeline switches the playback to :playing
.
By default, it does nothing.
Callback invoked on pipeline startup, right after handle_init/2
.
Callback invoked when children of Membrane.ChildrenSpec
are started.
Callback invoked when pipeline is requested to terminate with terminate/2
.
Callback invoked upon each timer tick. A timer can be started with Membrane.Pipeline.Action.start_timer
action.
Functions
Brings all the stuff necessary to implement a pipeline.
Lists PIDs of all the pipelines currently running on the current node.
Like list_pipelines/0
, but allows to pass a node.
Checks whether module is a pipeline.
Does the same as start_link/3
but starts process outside of supervision tree.
Starts the Pipeline based on given module and links it to the current process.
Terminates the pipeline.
Types
@type callback_return() :: {[Membrane.Pipeline.Action.t()], state()}
Defines return values from Pipeline callback functions.
Return values
{[action], state}
- Return a list of actions that will be performed within the pipeline. This can be used to start new children, or to send messages to specific children, for example. Actions are a tuple of{type, arguments}
, so may be written in the form a keyword list. SeeMembrane.Pipeline.Action
for more info.
@type config() :: [config_entry()]
@type config_entry() :: {:name, name()}
@type name() :: GenServer.name()
@type pipeline_options() :: any()
Defines options that can be passed to start/3
/ start_link/3
and received
in handle_init/2
callback.
@type state() :: any()
Callbacks
@callback handle_call( message :: any(), context :: Membrane.Pipeline.CallbackContext.t(), state() ) :: callback_return()
Callback invoked when pipeline is called using a synchronous call.
Context passed to this callback contains additional field :from
.
By default, it does nothing.
handle_child_notification(notification, element, context, state)
View Source (optional)@callback handle_child_notification( notification :: Membrane.ChildNotification.t(), element :: Membrane.Child.name(), context :: Membrane.Pipeline.CallbackContext.t(), state() ) :: callback_return()
Callback invoked when a notification comes in from a child.
By default, it ignores the notification.
@callback handle_child_pad_removed( child :: Membrane.Child.name(), pad :: Membrane.Pad.ref(), context :: Membrane.Pipeline.CallbackContext.t(), state :: state() ) :: callback_return()
Callback invoked when a child removes its pad.
The callback won't be invoked, when you have initiated the pad removal,
eg. when you have returned t:Membrane.Pipeline.Action.remove_link()
action which made one of your children's pads be removed.
By default, it does nothing.
@callback handle_crash_group_down( group_name :: Membrane.Child.group(), context :: Membrane.Pipeline.CallbackContext.t(), state() ) :: callback_return()
Callback invoked when crash of the crash group happens.
Context passed to this callback contains 2 additional fields: :members
and :crash_initiator
.
By default, it does nothing.
handle_element_end_of_stream(child, pad, context, state)
View Source (optional)@callback handle_element_end_of_stream( child :: Membrane.Child.name(), pad :: Membrane.Pad.ref(), context :: Membrane.Pipeline.CallbackContext.t(), state() ) :: callback_return()
Callback invoked when a child element finishes processing stream via given pad.
By default, it does nothing.
handle_element_start_of_stream(child, pad, context, state)
View Source (optional)@callback handle_element_start_of_stream( child :: Membrane.Child.name(), pad :: Membrane.Pad.ref(), context :: Membrane.Pipeline.CallbackContext.t(), state() ) :: callback_return()
Callback invoked when a child element starts processing stream via given pad.
By default, it does nothing.
@callback handle_info( message :: any(), context :: Membrane.Pipeline.CallbackContext.t(), state() ) :: callback_return()
Callback invoked when pipeline receives a message that is not recognized as an internal membrane message.
Useful for receiving data sent from NIFs or other stuff. By default, it ignores the received message.
@callback handle_init( context :: Membrane.Pipeline.CallbackContext.t(), options :: pipeline_options() ) :: callback_return()
Callback invoked on initialization of pipeline.
This callback is synchronous: the process which started the pipeline waits until handle_init
finishes. For that reason, it's important to do any long-lasting or complex work in handle_setup/2
,
while handle_init
should be used for things like parsing options, initializing state or spawning
children.
By default, it converts the opts
to a map if they're a struct and sets them as the pipeline state.
@callback handle_playing( context :: Membrane.Pipeline.CallbackContext.t(), state() ) :: callback_return()
Callback invoked when pipeline switches the playback to :playing
.
By default, it does nothing.
@callback handle_setup( context :: Membrane.Pipeline.CallbackContext.t(), state() ) :: callback_return()
Callback invoked on pipeline startup, right after handle_init/2
.
Any long-lasting or complex initialization should happen here. By default, it does nothing.
@callback handle_spec_started( children :: [Membrane.Child.name()], context :: Membrane.Pipeline.CallbackContext.t(), state() ) :: callback_return()
Callback invoked when children of Membrane.ChildrenSpec
are started.
By default, it does nothing.
@callback handle_terminate_request( context :: Membrane.Pipeline.CallbackContext.t(), state() ) :: callback_return()
Callback invoked when pipeline is requested to terminate with terminate/2
.
By default, it returns Membrane.Pipeline.Action.terminate/0
with reason :normal
.
@callback handle_tick( timer_id :: any(), context :: Membrane.Pipeline.CallbackContext.t(), state() ) :: callback_return()
Callback invoked upon each timer tick. A timer can be started with Membrane.Pipeline.Action.start_timer
action.
Functions
Brings all the stuff necessary to implement a pipeline.
Options:
:bring_spec?
- if true (default) imports and aliasesMembrane.ChildrenSpec
:bring_pad?
- if true (default) requires and aliasesMembrane.Pad
@spec list_pipelines() :: [pid()]
Lists PIDs of all the pipelines currently running on the current node.
Use only for debugging purposes.
Like list_pipelines/0
, but allows to pass a node.
Checks whether module is a pipeline.
@spec start(module(), pipeline_options(), config()) :: on_start()
Does the same as start_link/3
but starts process outside of supervision tree.
start_link(module, pipeline_options \\ nil, process_options \\ [])
View Source@spec start_link(module(), pipeline_options(), config()) :: on_start()
Starts the Pipeline based on given module and links it to the current process.
Pipeline options are passed to module's handle_init/2
callback.
Note that this function returns {:ok, supervisor_pid, pipeline_pid}
in case of
success. Check the 'Starting and supervision' section of the moduledoc for details.
@spec terminate(pipeline :: pid(), timeout: timeout(), force?: boolean(), asynchronous?: boolean() ) :: :ok | {:ok, pid()} | {:error, :timeout}
Terminates the pipeline.
Accepts three options:
asynchronous?
- if set totrue
, pipline termination won't be blocking and will be executed in the process, which pid is returned as function result. If set tofalse
, pipeline termination will be blocking and will be executed in the process that called this function. Defaults tofalse
.timeout
- tells how much time (ms) to wait for pipeline to get gracefully terminated. Defaults to 5000.force?
- if set totrue
and pipeline is still alive aftertimeout
, pipeline will be killed usingProcess.exit/2
with reason:kill
, and function will return{:error, :timeout}
. If set tofalse
and pipeline is still alive aftertimeout
, function will raise an error. Defaults tofalse
.
Returns:
{:ok, pid}
- if optionasynchronous?: true
was passed.:ok
- if pipeline was gracefully terminated withintimeout
.{:error, :timeout}
- if pipeline was killed after atimeout
.