View Source Using ExVision with Membrane

Mix.install(
  [
    :ex_vision,
    :image,
    :membrane_core,
    :membrane_file_plugin,
    :membrane_flv_plugin,
    :membrane_h26x_plugin,
    :membrane_h264_ffmpeg_plugin,
    :membrane_ffmpeg_swscale_plugin,
    {:membrane_mp4_plugin, "~> 0.34.2"},
    :kino,
    :kino_membrane
  ],
  config: [
    nx: [default_backend: EXLA.Backend]
  ]
)

Introduction

In this example we will showcase ExVision by integrating it into media processing pipeline using Membrane Framework. This livebook can be treated as a tutorial on this process.

Prerequisites

  • We will be using Membrane Framework, so basic familiarity with this framework is highly recommended
  • Basic familiarity with ExVision

Contents of this tutorial

You will learn how to write a Membrane Filter element that makes use of one of the ExVision's models, using an example of object detection.

In particular, we will implement a bird detector.

Integrate with Membrane

The main part of integrating with Membrane is implementing a Filter - an element which is responsible for applying a transformation on each frame in the stream.

But before we dive into the code, here are a few tips that will make it both easier to understand and easier to modify for your own usecase:

  • It's useful to constrain an accepted format on input and output pads to %Membrane.RawVideo{pixel_format: :RGB}.

    This format is equivalent to a stream of raw frames in RGB format, which is what most models are trained to accept. By setting this constraint, Membrane will be able to perform a sanity check to highlight errors some obvious errors in the processing pipeline.

  • Model should be loaded in the handle_setup/2 callback and stored in the element state.

    It may be tempting to initialize the model in handle_init/2 but it will delay the initialization of the pipeline, as it runs in the pipeline process, not the element process

Writing the Membrane Element

With that knowledge, let's implement the Membrane Filter that will be responsible for:

  1. initialization of the detection model
  2. feeding the frames through the detector
  3. Drawing the boxes indicating the detected birds in the resulting image, using the :image library
defmodule Membrane.ExVision.Detector do
  use Membrane.Filter

  alias ExVision.ObjectDetection.Ssdlite320_MobileNetv3, as: Model
  alias ExVision.Types.BBox

  # Define both input and output pads
  # On both, we want to have raw image in RGB
  def_input_pad(:input,
    accepted_format: %Membrane.RawVideo{pixel_format: :RGB},
    flow_control: :auto
  )

  def_output_pad(:output,
    accepted_format: %Membrane.RawVideo{pixel_format: :RGB},
    flow_control: :auto
  )

  defmodule State do
    @moduledoc """
    A struct describing the state of the detector element
    """
    defstruct [:detector]

    @type t() :: %__MODULE__{
            detector: Model.t() | nil
          }
  end

  @impl true
  def handle_init(_ctx, _opts) do
    {[], %State{}}
  end

  # Model initialization should be performed in this callback
  @impl true
  def handle_setup(_ctx, state) do
    # due to the quirk in Nx.Serving, all servings need to be registered,
    # as it's impossible  to make a call to Nx.Serving using PID
    # Generate a random process name
    name =
      10
      |> :crypto.strong_rand_bytes()
      |> then(&"#{&1}")
      |> :base64.encode()
      |> String.to_atom()

    {:ok, _pid} = Model.start_link(name: name)

    {[], %State{state | detector: name}}
  end

  # The frames will be received in this callback
  @impl true
  def handle_buffer(:input, buffer, ctx, %State{detector: detector} = state) do
    tensor = buffer_to_tensor(buffer, ctx.pads.input.stream_format)
    {:ok, image} = Image.from_nx(tensor)

    # Run inference and filter out unlikely bounding boxes
    predictions =
      detector
      |> Model.batched_run(tensor)
      # filter out butterfly bounding boxes
      |> Enum.filter(fn %BBox{score: score, label: label} -> score > 0.3 and label == :bird end)

    # For each bounding box, represent it as a rectangle in the image
    image =
      Enum.reduce(predictions, image, fn %BBox{} = prediction, image ->
        image
        |> Image.Draw.rect!(
          prediction.x1,
          prediction.y1,
          BBox.width(prediction),
          BBox.height(prediction),
          fill: false,
          color: :red,
          stroke_width: 5
        )
      end)

    # Emit the resulting buffer
    {[buffer: {:output, fill_buffer_with_image(image, buffer)}], state}
  end

  # Utility function that will 
  defp buffer_to_tensor(%Membrane.Buffer{payload: payload}, %Membrane.RawVideo{
         width: w,
         height: h
       }) do
    payload
    |> Nx.from_binary(:u8)
    |> Nx.reshape({h, w, 3}, names: [:height, :width, :colors])
  end

  # Replaces the payload of the Membrane Buffer with the image contents
  # This way, we're maintaining the buffer metadata, ex. the timestamps
  defp fill_buffer_with_image(image, buffer) do
    image |> Image.to_nx!(shape: :hwc) |> Nx.to_binary() |> then(&%{buffer | payload: &1})
  end
end

Create the processing pipeline

Now that we have a Membrane Filter implemented, the next step is to define a processing pipeline.

In this case, we will read the video from the file, feed it through our Detector element and then transform it back into a video in .mp4 format.

The details of this process can be a little complicated. That said, in simple terms, we're going to:

  1. read the file
  2. Parse the MP4 structure and extract the video from it
  3. Decode the video to achieve raw frames (images) and convert them to RGB
  4. Apply our Detector module
  5. Encode our images to H264
  6. Put our resulting video into the MP4 container
  7. Save the result to the file
defmodule Pipeline do
  use Membrane.Pipeline

  @impl true
  def handle_init(_ctx, {input_file, output_file}) do
    structure =
      child(%Membrane.File.Source{
        chunk_size: 1024,
        location: input_file,
        seekable?: true
      })
      |> child(:demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true})
      |> via_out(Pad.ref(:output, 1))
      |> child(%Membrane.H264.Parser{
        output_stream_structure: :annexb
      })
      |> child(Membrane.H264.FFmpeg.Decoder)
      |> child(%Membrane.FFmpeg.SWScale.PixelFormatConverter{format: :RGB})
      |> child(Membrane.ExVision.Detector)
      |> child(%Membrane.FFmpeg.SWScale.PixelFormatConverter{format: :I420})
      |> child(%Membrane.H264.FFmpeg.Encoder{profile: :baseline})
      |> child(%Membrane.H264.Parser{
        output_stream_structure: :avc1
      })
      |> child(Membrane.MP4.Muxer.ISOM)
      |> child(:sink, %Membrane.File.Sink{
        location: output_file
      })

    {[spec: structure], %{}}
  end

  # Terminate the process after the processing is finished
  @impl true
  def handle_element_end_of_stream(:sink, :input, _ctx, state) do
    Membrane.Pipeline.terminate(self(), asynchronous?: true)
    {[], state}
  end

  @impl true
  def handle_element_end_of_stream(_element, _pad, _ctx, state), do: {[], state}
end

You're welcome to run the inference on your own file, but please keep in mind that this pipeline is specific to MP4 files containing H264 video and no audio stream, it will not work on other type of files.

Run inference

We have written the Filter responsible for applying our model and the full processing pipeline! It's time to make use of it. Let's download our input file first:

{:ok, input_file} = ExVision.Cache.lazy_get(ExVision.Cache, "assets/example.mp4")

Define the location of our output file:

output_file = Path.join("/tmp", "#{DateTime.utc_now()}.mp4")

And finally, execute our pipeline

{:ok, _supervisor_pid, pipeline_pid} =
  Membrane.Pipeline.start(Pipeline, {input_file, output_file})

Download the results

The pipeline is running in a separate process, therefore the previous call wasn't blocking. Our output file is not ready until the pipeline finishes and therefore terminates.

In order to get notified about the pipeline terminating, we will make use of Process.monitor/1

monitor = Process.monitor(pipeline_pid)

{time, _result} =
  :timer.tc(fn ->
    receive do
      {:DOWN, ^monitor, :process, _pid, _reson} -> :ok
    end
  end)

Kino.Text.new("Operation took #{time / 1_000_000} seconds")

After the cell above has finished evaluating, our output file should already be all ready.

Let's write some code to fetch it from the notebook.

content_btn =
  Kino.Download.new(fn -> File.read!(output_file) end,
    label: "Download the video",
    filename: "video.mp4"
  )

delete_btn = Kino.Control.button("Delete the file permanently")
no_file_msg = Kino.Text.new("The file doesn't exist")

Kino.listen(delete_btn, fn _data ->
  File.rm!(output_file)
  Kino.render(no_file_msg)
end)

if File.exists?(output_file),
  do: Kino.Layout.grid([content_btn, delete_btn], gap: 10),
  else: no_file_msg