This guide outlines best practices for integrating Membrane Pipelines into your Elixir application, specifically focusing on how to attach pipelines to your application's supervision tree.
In most cases, pipelines are used in one of the following scenarios:
Static Orchestration: Maintaining a single, permanent pipeline that always runs with your application (e.g. mixing output from multiple IP cameras into a single HLS stream).
Dynamic Orchestration: Spawning pipelines on-demand based on user actions (e.g. starting a unique pipeline for every user joining a conference room).
Batch Processing: Handling "offline" tasks with a clear start and end (e.g. transcoding a stream inside an MP4 file).
Static Orchestration: Supervisor on Startup
This approach is ideal when your pipeline architecture is fixed and needs to run continuously from the moment your application starts. Imagine an application that monitors a security camera feed to detect people or vehicles in real-time. Such an application could consist of two components:
The Membrane Pipeline which connects to an RTSP stream, decodes the video, and extracts raw frames. It sends these frames to the external process (via a Unix socket or standard input), receives the transformed video back, re-encodes it, and broadcasts the final stream using HLS.
The OS Process being a Python script running a machine learning model like RF-DETR. It reads raw video frames and performs object segmentation, coloring the pixels that correspond to detected objects.
The pipeline could look similar to this one:
defmodule MyProject.Pipeline do
use Membrane.Pipeline
@impl true
def handle_init(_ctx, rtsp_url) do
spec = [
child(:source, %Membrane.RTSP.Source{
transport: :tcp,
allowed_media_types: [:video],
stream_uri: rtsp_url,
on_connection_closed: :send_eos
})
]
{[spec: spec], %{}}
end
@impl true
def handle_child_notification({:set_up_tracks, tracks}, _child, _ctx, state) do
hls_config = %Membrane.HTTPAdaptiveStream.SinkBin{
manifest_module: Membrane.HTTPAdaptiveStream.HLS,
target_window_duration: Membrane.Time.seconds(120),
storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{
directory: "output_hls"
}
}
video_track = Enum.find(tracks, &(&1.type == :video))
spec = [
get_child(:source)
|> via_out(Pad.ref(:output, video_track.control_path))
|> child(:depayloader, Membrane.H264.RTP.Depayloader)
|> child(:parser, Membrane.H264.Parser)
|> child(:decoder, Membrane.H264.FFmpeg.Decoder)
# filter talking with the side-car Python OS process running RF-DETR model
|> child(:segmentation_filter, MyProject.ObjectSegmentationFilter)
|> child(:encoder, %Membrane.H264.FFmpeg.Encoder{preset: :fast})
|> via_in(:input, options: [encoding: :H264, segment_duration: Membrane.Time.seconds(10)])
|> child(:hls, hls_config)
]
{[spec: spec], state}
end
@impl true
def handle_child_notification(_notificaiton, _child, _ctx, state), do: {[], state}
endThen you could create a dedicated supervisor to manage the pipeline alongside any necessary "side-car" processes such as a MuonTrap.Daemon wrapping an external OS command.
This allows you to treat the pipeline and its dependencies as a single unit.
defmodule MyProject.InfrastructureSupervisor do
use Supervisor
def start_link(args) do
Supervisor.start_link(__MODULE__, args, name: __MODULE__)
end
@impl true
def init(args) do
children = [
%{
id: Pipeline,
start: {Membrane.Pipeline, :start_link, [MyProject.Pipeline, args[:rtsp_url]]},
restart: :transient
},
%{
id: SideCar,
start:
{MuonTrap.Daemon, :start_link,
["python", ["run_model.py", "rf-detr-large-2026.pth"], [log_output: :debug]]},
restart: :transient
}
]
Supervisor.init(children, strategy: :one_for_all)
end
endUsage of :one_for_all strategy ensures that the MuonTrap.Deamon get restarted each time the pipeline restarts and another way around.
In your specific scenario, you may need to adjust the :restart and :strategy
options to reflect the dependencies between supervised processes.
To launch this when your app boots, add the InfrastructureSupervisor to the children list in your application.ex:
defmodule MyProject.Application do
use Application
@impl true
def start(_type, _args) do
children = [
{MyProject.InfrastructureSupervisor, [rtsp_url: "rtsp://user:pass@127.0.0.1:554"]}
# ... other children required by your application
]
opts = [strategy: :one_for_one, name: MyProject.Supervisor]
Supervisor.start_link(children, opts)
end
endDynamically spawning the pipelines under the DynamicSupervisor
While the static approach works perfectly for fixed infrastructure, many applications require more flexibility. Consider a scenario where you need to spawn a new pipeline on demand to ingest an RTSP stream from a camera provided by a user.
Since we have already defined the InfrastructureSupervisor, scaling to multiple dynamic pipelines is straightforward.
Instead of starting the InfrastructureSupervisor directly in your application tree, we add a DynamicSupervisor.
This allows us to spawn new infrastructure "units" (the pipeline + sidecars) on demand.
defmodule MyProject.Application do
use Application
@impl true
def start(_type, _args) do
children = [
{DynamicSupervisor, strategy: :one_for_one, name: MyProject.PipelineDynamicSupervisor}
# ... other children required by your application
]
opts = [strategy: :one_for_one, name: MyProject.Supervisor]
Supervisor.start_link(children, opts)
end
endNow, whenever a new pipeline is needed you can spawn a new instance of your infrastructure using DynamicSupervisor.start_child/2.
It could happen e.g. inside mount/3 callback of your LiveView:
def mount(params, _session, socket) do
if connected?(socket) do
{:ok, infrastructure_supervisor_pid} = DynamicSupervisor.start_child(
MyProject.PipelineDynamicSupervisor,
{MyProject.InfrastructureSupervisor, params["rtsp_url"]}
)
{:ok, assign(socket, :infrastructure_supervisor_pid, infrastructure_supervisor_pid)}
else
{:ok, socket}
end
endWhen you need to stop the pipeline, you can use DynamicSupervisor.terminate_child/2.
In case of your LiveView commponent it could happen in its terminate/2 callback:
def terminate(_reason, socket) do
if pid = socket.assigns[:infrastructure_supervisor_pid] do
DynamicSupervisor.terminate_child(MyProject.PipelineDynamicSupervisor, pid)
end
:ok
endBatch Processing
Batch processing is ideal for "offline tasks" where you need to ensure the job completes successfully.
For example, let's assume we want to rescale H.264 video in MP4 container. To achieve this, we build the pipeline as follows:
defmodule TranscodePipeline do
use Membrane.Pipeline
alias Membrane.File.{Source, Sink}
alias Membrane.H264.FFmpeg.{Decoder, Encoder}
alias Membrane.H264.Parser
alias Membrane.FFmpeg.SWScale.Converter
@impl true
def handle_init(_ctx, opts) do
spec = [
child(:source, %Source{location: opts[:input_path]})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(:output, options: [kind: :video])
|> child(:in_parser, %Parser{output_stream_structure: :annexb})
|> child(:decoder, Decoder)
|> child(:converter, %Converter{output_width: 1080, output_height: 720})
|> child(:encoder, Encoder)
|> child(:out_parser, %Parser{output_stream_structure: :avc1})
|> child(:muxer, Membrane.MP4.Muxer.ISOM)
|> child(:sink, %Sink{location: opts[:output_path]})
]
{[spec: spec], %{}}
end
@impl true
def handle_element_end_of_stream(:sink, :input, _ctx, state), do: {[terminate: :normal], state}
@impl true
def handle_element_end_of_stream(_child, _pad, _ctx, state), do: {[], state}
endTo ensure reliability, we might wrap execution of the pipeline in an Oban worker.
Oban is the standard library for background job processing in Elixir. It persists jobs to your database, ensuring that your long-running transcoding tasks are fault-tolerant. If the pipeline crashes or the server restarts, Oban will automatically retry the job until it succeeds. To install Oban in your project, you can follow this installation guide.
Assuming that Oban is installed in your project and the :default queue is configured we can define the Oban worker:
defmodule VideoTranscoderWorker do
use Oban.Worker, queue: :default, unique: [period: 60]
@impl true
def perform(%Oban.Job{args: %{"input_path" => input, "output_path" => output}}) do
{:ok, _supervisor_pid, pipeline_pid} =
Membrane.Pipeline.start_link(
TranscodePipeline,
input_path: input,
output_path: output
)
ref = Process.monitor(pipeline_pid)
receive do
{:DOWN, ^ref, :process, ^pipeline_pid, :normal} ->
:ok
end
end
@impl true
def timeout(_job), do: :timer.minutes(5)
endNow we can run the Oban worker:
%{
"input_path" => "input.mp4",
"output_path" => "output.mp4"
}
|> VideoTranscoderWorker.new()
|> Oban.insert()When the job terminates successfully, you will see the output output.mp4 file with transcoded video.