This guide explains how to use manual demands. Make sure you are familiar with the concepts from the flow control guide and the pads guide before proceeding.
Manual demands are a mechanism for manually controlling the speed of processing
data in Membrane pipelines. This mechanism is powerful, but requires manual
demand management. :auto flow control delivers input buffers as fast as possible, for as long as
the output has outstanding demand. If that is sufficient for your use case,
prefer it instead.
Elements with pads using manual flow control have two responsibilities:
- Output pads: produce the amount of data requested in the
handle_demand/5callback, or less if the stream ends (by returning an:end_of_streamaction). - Input pads: explicitly request data using the
:demandaction — data only arrives after demand has been issued.
Output pads and handle_demand
When downstream element requests data, Membrane invokes
handle_demand/5 on the
element whose output pad with manual flow control is connected to that
downstream element. The callback receives the pad name, the demanded amount, and the
demand unit. The element is expected to produce and send that amount of data,
or less if the stream ends. It does not need to fulfill the whole demand in a
single callback invocation — it can send fewer buffers and return a
:redemand action to be called again
with the reduced outstanding demand.
Available demand units:
:buffers— demand is expressed as a buffer count.:bytes— demand is expressed as a number of bytes.- Timestamp units (
:timestamp,{:timestamp, :pts},{:timestamp, :dts},{:timestamp, :dts_or_pts}) — demand is expressed as a time threshold; only input pads support these. See the timestamp section below for details.
The unit in which demand_size is expressed in an output pad's handle_demand is resolved as follows:
If the output pad declares
demand_unit: :buffers | :bytes, that unit is used.- Otherwise, if the linked input pad declares
demand_unit: :buffersordemand_unit: :bytes, that unit is inherited. - Otherwise — when the input pad uses a timestamp demand unit or has auto
flow control — the output pad inherits
:buffers.
The output pad can explicitly control which unit it receives demand in.
Timestamp-based demand units are only available on input pads; output pads must
not declare a timestamp demand_unit. If the units differ between the output
pad and the connected input pad, Membrane automatically converts demand to the
unit declared by the output pad.
Declarative nature of handle_demand
handle_demand/5 always
receives the total current outstanding demand from downstream, not a delta.
There is no need to accumulate demand values across multiple callback invocations
— each call tells you the full amount still expected. The incoming_demand field
of the handle_demand/5
context holds the amount by which demand increased since the previous invocation,
if you need that delta.
Producing buffers on demand
A source handling demand when the unit is :buffers:
defmodule MySource do
use Membrane.Source
def_output_pad :output,
flow_control: :manual,
demand_unit: :buffers,
accepted_format: _any
@impl true
def handle_demand(:output, demand_size, :buffers, _ctx, state) do
buffers =
Enum.map(1..demand_size, fn _ ->
%Membrane.Buffer{payload: :crypto.strong_rand_bytes(256)}
end)
{[buffer: {:output, buffers}], state}
end
endA source handling demand when the unit is :bytes:
defmodule MyBytesSource do
use Membrane.Source
def_output_pad :output,
flow_control: :manual,
demand_unit: :bytes,
accepted_format: _any
@impl true
def handle_demand(:output, demand_size, :bytes, _ctx, state) do
buffer = %Membrane.Buffer{payload: :crypto.strong_rand_bytes(demand_size)}
{[buffer: {:output, buffer}], state}
end
endRedemand
Sometimes producing all demanded buffers at once is not possible or not
desired — for example when the element can produce at most one buffer per invocation. In that
case, return the :redemand action
alongside the buffer. Membrane will then invoke
handle_demand/5 again
with the demand reduced by the size of what was just sent.
@impl true
def handle_demand(:output, _demand_size, :buffers, _ctx, state) do
buffer = %Membrane.Buffer{payload: :crypto.strong_rand_bytes(256)}
{[buffer: {:output, buffer}, redemand: :output], state}
endIf demand starts at 5, the call sequence looks like this:
handle_demand(:output, 5, :buffers, ctx, state) # → sends 1 buffer, redemands
handle_demand(:output, 4, :buffers, ctx, state) # → sends 1 buffer, redemands
handle_demand(:output, 3, :buffers, ctx, state) # → sends 1 buffer, redemands
handle_demand(:output, 2, :buffers, ctx, state) # → sends 1 buffer, redemands
handle_demand(:output, 1, :buffers, ctx, state) # → sends 1 buffer, redemands
# demand reaches 0 — handle_demand is not called again until new demand arrivesInput pads and the demand action
To receive data on an input pad with manual flow control, the element must
issue a :demand action. The demand
unit is declared in the input pad spec.
:buffers and :bytes
Example: a sink demanding buffers
Below is a sink that every second demands 5 buffers and appends their payloads to a file. This sink is contrived and exists only to demonstrate the demand mechanism.
defmodule MyDemoSink do
use Membrane.Sink
def_input_pad :input,
flow_control: :manual,
demand_unit: :buffers,
accepted_format: _any
@impl true
def handle_playing(_ctx, state) do
{[start_timer: {:demand_timer, Membrane.Time.seconds(1)}, demand: {:input, 5}], state}
end
@impl true
def handle_tick(:demand_timer, _ctx, state) do
{[demand: {:input, 5}], state}
end
@impl true
def handle_buffer(:input, buffer, _ctx, state) do
File.write!("output.bin", buffer.payload, [:append])
{[], state}
end
endExample: demanding bytes
The same sink, but demanding 100 bytes at a time instead of 5 buffers:
defmodule MyBytesDemoSink do
use Membrane.Sink
def_input_pad :input,
flow_control: :manual,
demand_unit: :bytes,
accepted_format: _any
@impl true
def handle_playing(_ctx, state) do
{[start_timer: {:demand_timer, Membrane.Time.seconds(1)}, demand: {:input, 100}], state}
end
@impl true
def handle_tick(:demand_timer, _ctx, state) do
{[demand: {:input, 100}], state}
end
@impl true
def handle_buffer(:input, buffer, _ctx, state) do
File.write!("output.bin", buffer.payload, [:append])
{[], state}
end
endWhen demanding in bytes, if a buffer boundary does not align with the requested byte count, the buffer is split. Both fragments share the same metadata and timestamps as the original buffer.
Declarative nature of demands
This section applies to :buffers and :bytes demand units. Timestamp demand
units behave differently — see the timestamp section
below.
The :demand action overwrites the current demand — it does not add to it.
Issuing demand: {:input, 5} sets the demand to 5 regardless of how much
demand was already pending.
This matters when a new demand is issued before the previous one is fully satisfied. For example, if you demand 5, receive 3 buffers, then demand 5 again:
| Event | Demand |
|---|---|
demand: {:input, 5} | 5 |
| buffer received | 4 |
| buffer received | 3 |
| buffer received | 2 |
demand: {:input, 5} | 5 ← overwrites 2 |
| buffer received | 4 |
| buffer received | 3 |
| buffer received | 2 |
| buffer received | 1 |
| buffer received | 0 |
Total received: 8, not 10. The remaining 2 from the first demand were
discarded when the second demand: {:input, 5} was issued.
If you need to accumulate demand rather than overwrite it, use the function form:
{[demand: {:input, &(&1 + 5)}], state}Timestamp demand units
Timestamp demand units let an element request buffers by specifying a timestamp threshold rather than a count or byte size. The element receives all buffers up to and including the first one whose timestamp meets or exceeds the demanded value.
Timestamp demand units are exclusive to manual input pads. Unless a specific unit is explicitly configured for the output pad linked to an input pad using :timestamps, that output pad defaults to receiving demands in :buffers.
Available variants
{:timestamp, :pts}— uses each buffer's:ptsfield.{:timestamp, :dts}— uses each buffer's:dtsfield.:timestampor{:timestamp, :dts_or_pts}— usesbuffer.dts || buffer.pts.
Timestamp requirements
All buffers passing through an input pad with a timestamp demand unit must have
their relevant timestamp field set to a non-nil value — Membrane will raise
an error if a buffer is missing its timestamp. Additionally, timestamps must be
monotonically non-decreasing; non-monotonic timestamps will cause a warning and undefined behaviour.
For {:timestamp, :pts}, note that PTS can be non-monotonic in streams with
B-frames (see the timestamps guide). Prefer
{:timestamp, :dts} or {:timestamp, :dts_or_pts} when DTS is available.
How timestamp demands work
The demand value is a Membrane.Time.t/0 value. Use Membrane.Time
functions to construct it, for example Membrane.Time.seconds(1).
Timestamps are interpreted relative to an offset — the timestamp of the very first buffer ever received on the pad. This means you never need to know the absolute starting timestamp of the stream; you only work with durations from that origin.
For example, if the first buffer arrives with pts: Membrane.Time.seconds(100)
and you issue demand: {:input, Membrane.Time.seconds(1)}, Membrane will
deliver buffers until it reaches the first one with
pts >= Membrane.Time.seconds(100) + Membrane.Time.seconds(1), i.e.
pts >= Membrane.Time.seconds(101).
When you issue demand: {:input, t}, Membrane delivers all buffers
whose timestamp (minus the offset) is strictly less than t, plus the first
buffer whose timestamp (minus the offset) equals or exceeds t. This means
that if you demand a value strictly greater than the last received buffer's
timestamp (minus the offset), you are guaranteed to receive at least one new
buffer.
Incrementing demands
To receive a continuous stream in time-based chunks, always demand a value larger than the timestamp (minus the offset) of the last received buffer. Issuing a demand at or below that value will result in a warning and no buffers being delivered.
Example: a sink consuming one second of data per second
defmodule MyTimestampSink do
use Membrane.Sink
# For demonstration purposes only.
def_input_pad :input,
flow_control: :manual,
demand_unit: {:timestamp, :pts},
accepted_format: _any
@impl true
def handle_playing(_ctx, state) do
{[
start_timer: {:demand_timer, Membrane.Time.seconds(1)},
demand: {:input, Membrane.Time.seconds(1)}
], %{state | next_demand: Membrane.Time.seconds(2)}}
end
@impl true
def handle_tick(:demand_timer, _ctx, state) do
{[demand: {:input, state.next_demand}],
%{state | next_demand: state.next_demand + Membrane.Time.seconds(1)}}
end
@impl true
def handle_buffer(:input, buffer, _ctx, state) do
File.write!("output.bin", buffer.payload, [:append])
{[], state}
end
endEach timer tick advances the demanded threshold by one second: 1s, 2s,
3s, … All values are measured from the PTS of the very first buffer received
on :input, so each demand naturally covers the next one-second slice of the
stream.
Filters with manual flow control
The canonical pattern for a filter with both pads using manual flow control is:
handle_demand/5— propagate the demand upstream by returning a:demandaction on the input pad. The demand passed upstream might be modified, e.g. it is multiplied by 2 in the example below.handle_buffer/4— process the incoming buffer and forward it (possibly modified) downstream via a:bufferaction.
The following example combines every two input buffers into one output buffer,
so to satisfy a demand of n output buffers, the filter needs n * 2 input
buffers.
defmodule MyFilter do
use Membrane.Filter
def_input_pad :input,
flow_control: :manual,
demand_unit: :buffers,
accepted_format: _any
def_output_pad :output,
flow_control: :manual,
demand_unit: :buffers,
accepted_format: _any
@impl true
def handle_init(_ctx, _opts) do
{[], %{pending: nil}}
end
@impl true
def handle_demand(:output, demand_size, :buffers, _ctx, state) do
{[demand: {:input, demand_size * 2}], state}
end
@impl true
def handle_buffer(:input, buffer, _ctx, %{pending: nil} = state) do
{[], %{state | pending: buffer}}
end
def handle_buffer(:input, buffer, _ctx, %{pending: first} = state) do
output = %Membrane.Buffer{
payload: first.payload <> buffer.payload,
pts: first.pts,
dts: first.dts,
metadata: first.metadata
}
{[buffer: {:output, output}], %{state | pending: nil}}
end
endRedemand in filters
Sometimes the number of input buffers needed to satisfy a given output demand
is not known upfront. The relationship between input and output may be
non-deterministic — for instance, a filter may drop buffers based on their
content, or produce output of varying size depending on the input. In such
cases, the required input demand cannot be calculated in
handle_demand/5 alone.
In such cases, the filter can use
:redemand. The pattern works as
follows:
handle_demand/5issues demand on the input pad — the value may be derived from the output demand, but does not have to match it exactly.handle_buffer/4processes each arriving buffer and returns a:redemandaction at the end.:redemandre-triggershandle_demand/5with the current unfulfilled output demand, which in turn issues new demand upstream — and so on.
This interleaves handle_demand/5
and handle_buffer/4
calls rather than batching them. It is less efficient than computing the exact
input demand upfront, but it makes implementation straightforward when the
required amount of input data cannot be determined in advance.
As a concrete example, consider a filter that keeps only buffers whose timestamp falls on an odd second of the stream, and drops all others:
defmodule OddFilter do
use Membrane.Filter
def_input_pad :input,
flow_control: :manual,
demand_unit: :buffers,
accepted_format: _any
def_output_pad :output,
flow_control: :manual,
demand_unit: :buffers,
accepted_format: _any
@impl true
def handle_demand(:output, demand_size, :buffers, _ctx, state) do
{[demand: {:input, demand_size}], state}
end
@impl true
def handle_buffer(:input, buffer, _ctx, state) do
seconds =
Membrane.Buffer.get_dts_or_pts(buffer)
|> Membrane.Time.as_seconds(:round)
buffer_action =
if rem(seconds, 2) != 0 do
[buffer: {:output, buffer}]
else
[]
end
{buffer_action ++ [redemand: :output], state}
end
endDo not use redemand in a filter's handle_demand
Returning :redemand from
handle_demand/5 in a
filter is illegal. Filters are designed around a clear separation of concerns:
handle_demand/5
propagates demand upstream, while
handle_buffer/4
transforms data and passes it downstream. Returning :redemand from
handle_demand/5
would cause it to re-trigger itself in a loop — and because the element
process is busy executing that loop, it cannot read messages from its mailbox,
including buffers sent by upstream elements. Those buffers would never be
delivered to
handle_buffer/4,
causing the pipeline to hang.
Auto flow control
If you are writing a filter that processes buffers one by one without needing
to control the timing or quantity of incoming data, consider auto flow control
instead. With auto flow control, Membrane manages demands automatically: when
all auto output pads have positive demand, it issues demand on all auto input
pads. The element only needs to implement handle_buffer/4.
defmodule MyAutoFilter do
use Membrane.Filter
def_input_pad :input,
flow_control: :auto,
accepted_format: _any
def_output_pad :output,
flow_control: :auto,
accepted_format: _any
@impl true
def handle_buffer(:input, buffer, _ctx, state) do
processed = %{buffer | payload: process(buffer.payload)}
{[buffer: {:output, processed}], state}
end
defp process(payload) do
for <<byte <- payload>>, into: <<>>, do: <<Bitwise.bxor(byte, 0xFF)>>
end
endUse manual flow control when the element needs to control the timing or quantity of incoming data explicitly — for example when producing a fixed time window of buffers, or when the ratio between input and output buffers is not fixed.