View Source Evision.DNN Example - Generic Object Detection Task

# set `EVISION_PREFER_PRECOMPILED` to `false` 
# if you prefer `:evision` to be compiled from source
# note that to compile from source, you may need at least 1GB RAM
# System.put_env("EVISION_PREFER_PRECOMPILED", "false")

Mix.install([
  {:evision, "~> 0.1.10"},
  {:req, "~> 0.3"},
  {:kino, "~> 0.7"}
])
:ok

define-some-helper-functions

Define Some Helper Functions

defmodule Helper do
  def download!(url, save_as, overwrite? \\ false) do
    unless File.exists?(save_as) do
      Req.get!(url, http_errors: :raise, output: save_as, cache: not overwrite?)
    end

    :ok
  end
end
{:module, Helper, <<70, 79, 82, 49, 0, 0, 10, ...>>, {:download!, 3}}

write-the-generic-detectionmodel-module

Write the Generic DetectionModel Module

alias Evision, as: Cv

# change to the file's directory
# or somewhere you have write permission
File.cd!(__DIR__)

defmodule DetectionModel do
  def visualise_pred(mat, _labels, []), do: {:ok, mat}

  def visualise_pred(mat, labels, [translated_out | outs]) do
    {:ok, mat} = _visualise_pred(mat, labels, translated_out)
    visualise_pred(mat, labels, outs)
  end

  defp _visualise_pred(mat, _labels, []), do: {:ok, mat}

  defp _visualise_pred(mat, labels, [{class_id, confidence, l, t, r, b} | outs]) do
    confidence = "#{Float.round(confidence, 2)}"
    label = Enum.at(labels, class_id)
    text = "#{label}: #{confidence}"
    mat = Cv.rectangle(mat, {l, t}, {r, b}, {255, 0, 0})

    {{label_weight, label_height}, baseline} =
      Cv.getTextSize(text, Cv.cv_FONT_HERSHEY_SIMPLEX(), 0.5, 1)

    label_weight = trunc(label_weight)
    label_height = trunc(label_height)
    top = max(t, label_height)

    mat =
      Cv.rectangle(mat, {l, top - label_height}, {l + label_weight, top + baseline}, {
        255,
        255,
        255
      })

    mat = Cv.putText(mat, text, {l, top}, Cv.cv_FONT_HERSHEY_SIMPLEX(), 0.5, {0, 0, 255})

    _visualise_pred(mat, labels, outs)
  end

  def postprocess(mat, detections, net, confidence_threshold) do
    out_layers = Cv.DNN.Net.getUnconnectedOutLayers(net)
    out_layer = Cv.DNN.Net.getLayer(net, Enum.at(out_layers, 0))
    out_layer_type = Cv.DNN.Layer.get_type(out_layer) |> IO.iodata_to_binary()
    _postprocess(mat, detections, net, confidence_threshold, out_layer_type, [])
  end

  defp _postprocess(_mat, [], _net, _confidence_threshold, <<"DetectionOutput">>, acc),
    do: {:ok, Enum.reverse(acc)}

  defp _postprocess(
         %Evision.Mat{shape: {h, w, _}} = mat,
         [outs | detections],
         net,
         confidence_threshold,
         <<"DetectionOutput">>,
         acc
       ) do
    data = Cv.Mat.to_binary(outs)
    {:ok, translated_outs} = _translate_outs(confidence_threshold, data, h, w, [])

    _postprocess(mat, detections, net, confidence_threshold, "DetectionOutput", [
      translated_outs | acc
    ])
  end

  defp _translate_outs(_confidence_threshold, <<>>, _h, _w, acc), do: {:ok, acc}

  defp _translate_outs(
         confidence_threshold,
         <<_batch_id::float-size(32)-little, class_id::float-size(32)-little,
           confidence::float-size(32)-little, left::float-size(32)-little,
           top::float-size(32)-little, right::float-size(32)-little,
           bottom::float-size(32)-little, rest::binary>>,
         h,
         w,
         acc
       ) do
    if confidence > confidence_threshold do
      [class_id, l, t, r, b] =
        Enum.map([class_id, left, top, right, bottom], fn f -> trunc(f) end)

      width = r - l + 1
      height = b - t + 1

      [l, t, r, b] =
        if width <= 2 or height <= 2 do
          Enum.map([left * w, top * h, right * w, bottom * h], fn f -> trunc(f) end)
        else
          [l, t, r, b]
        end

      _translate_outs(confidence_threshold, rest, h, w, [
        {class_id - 1, confidence, l, t, r, b} | acc
      ])
    else
      _translate_outs(confidence_threshold, rest, h, w, acc)
    end
  end

  def get_labels(class_label_file) do
    class_label_file
    |> File.read!()
    |> String.split("\n")
  end

  def predict(mat, model, out_names, opts \\ []) do
    blob = Cv.DNN.blobFromImage(mat, opts)

    model = Cv.DNN.Net.setInput(model, blob, name: "", scalefactor: 1.0, mean: [0, 0, 0])

    start_time = :os.system_time(:millisecond)
    detections = Cv.DNN.Net.forward(model, outBlobNames: out_names)
    end_time = :os.system_time(:millisecond)
    IO.puts("Inference time=>#{end_time - start_time} ms")
    {:ok, mat, detections}
  end

  def predict_file(image_file, model, out_names, opts \\ []) do
    predict(Cv.imread(image_file), model, out_names, opts)
  end

  def get_model(params, config, framework \\ "") do
    net =
      Cv.DNN.readNet(params,
        config: config,
        framework: framework
      )

    out_names = Cv.DNN.Net.getUnconnectedOutLayersNames(net)
    {:ok, net, out_names}
  end
end
{:module, DetectionModel, <<70, 79, 82, 49, 0, 0, 34, ...>>, {:get_model, 3}}

example-detect-model-ssd-mobilenetv2

Example Detect Model: SSD MobileNetv2

Basic steps:

  1. Download model weights and config file, as well as a list of class names.
  2. DetectionModel.get_model.
  3. DetectionModel.get_labels.
  4. DetectionModel.predict and specify some preprocessing parameters.
  5. DetectionModel.postprocess. This translates nerual network outputs to data that is easier to read/use.
  6. DetectionModel.visualise_pred if you want to see the result.
defmodule SSDMobileNetV2 do
  defp download_model() do
    Helper.download!(
      "https://raw.githubusercontent.com/opencv/opencv_extra/master/testdata/dnn/ssd_mobilenet_v2_coco_2018_03_29.pbtxt",
      "ssd_mobilenet_v2_coco_2018_03_29.pbtxt"
    )

    Helper.download!(
      "https://raw.githubusercontent.com/cocoa-xu/evision/main/test/testdata/models/coco_names.txt",
      "coco_names.txt"
    )

    graph_pb = "ssd_mobilenet_v2_coco_2018_03_29/frozen_inference_graph.pb"

    if !File.exists?(graph_pb) do
      Helper.download!(
        "http://download.tensorflow.org/models/object_detection/ssd_mobilenet_v2_coco_2018_03_29.tar.gz",
        "ssd_mobilenet_v2_coco_2018_03_29.tar.gz"
      )

      "ssd_mobilenet_v2_coco_2018_03_29.tar.gz"
      |> File.read!()
      |> :zlib.gunzip()
      |> then(&:erl_tar.extract({:binary, &1}, [:memory, :compressed]))
      |> elem(1)
      |> Enum.map(fn {filename, content} -> {List.to_string(filename), content} end)
      |> Enum.reject(
        &(elem(&1, 0) != "ssd_mobilenet_v2_coco_2018_03_29/frozen_inference_graph.pb")
      )
      |> Enum.at(0)
      |> then(fn {_filename, content} ->
        File.mkdir_p!("ssd_mobilenet_v2_coco_2018_03_29")
        File.write!(graph_pb, content)
        :ok
      end)
    else
      :ok
    end
  end

  def get_detection_model() do
    :ok = download_model()
    graph_pb = "ssd_mobilenet_v2_coco_2018_03_29/frozen_inference_graph.pb"

    {:ok, net, out_names} =
      DetectionModel.get_model(
        graph_pb,
        "ssd_mobilenet_v2_coco_2018_03_29.pbtxt"
      )

    labels = DetectionModel.get_labels("coco_names.txt")
    {net, out_names, labels}
  end

  def predict_file_and_show(filename, confidence_threshold \\ 0.5) when is_binary(filename) do
    {net, out_names, labels} = get_detection_model()

    {:ok, mat, detections} =
      DetectionModel.predict_file(filename, net, out_names,
        scalefactor: 1,
        swapRB: true,
        mean: [0, 0, 0],
        size: [300, 300]
      )

    {:ok, translated_outs} =
      DetectionModel.postprocess(mat, detections, net, confidence_threshold)

    {:ok, mat} = DetectionModel.visualise_pred(mat, labels, translated_outs)
    mat
  end

  def predict_and_show(net, out_names, labels, mat, confidence_threshold \\ 0.5)
      when is_reference(mat) do
    {:ok, mat, detections} =
      DetectionModel.predict(mat, net, out_names,
        scalefactor: 1,
        swapRB: true,
        mean: [0, 0, 0],
        size: [300, 300]
      )

    {:ok, translated_outs} =
      DetectionModel.postprocess(mat, detections, net, confidence_threshold)

    {:ok, mat} = DetectionModel.visualise_pred(mat, labels, translated_outs)
    mat
  end
end
{:module, SSDMobileNetV2, <<70, 79, 82, 49, 0, 0, 22, ...>>, {:predict_and_show, 5}}

detect-objects-in-an-image

Detect Objects in An Image

Helper.download!(
  "https://raw.githubusercontent.com/cocoa-xu/evision/main/test/dnn_detection_test.jpg",
  "dnn_detection_test.jpg"
)

%Evision.Mat{type: type} = mat = SSDMobileNetV2.predict_file_and_show("dnn_detection_test.jpg")
Inference time=>35 ms
%Evision.Mat{
  channels: 3,
  dims: 2,
  type: {:u, 8},
  raw_type: 16,
  shape: {586, 872, 3},
  ref: #Reference<0.1834280076.3317825552.216669>
}
Cv.imencode(".png", mat)
|> Kino.Image.new(:png)

detect-objects-in-a-video-stream

Detect Objects in a Video Stream

# if you have a camera available
# uncomment the line below
# video = Cv.VideoCapture.videoCapture(0)

# or the OpenCV library you compiled can decode video files
# uncomment the line below
# video = Cv.VideoCapture.videoCapture("/path/to/your/video/file")
nil
defmodule VideoDetection do
  def detect(video, widget, max_frames \\ 30 * 60)
      when is_reference(video) and is_integer(max_frames) do
    {net, out_names, labels} = SSDMobileNetV2.get_detection_model()
    frame_read = Cv.VideoCapture.read(video)
    _detect(net, out_names, labels, frame_read, video, widget, max_frames)
  end

  defp _detect(_, _, _, _, _, _, 0), do: :ok

  defp _detect(net, out_names, labels, frame, video, widget, left_frames)
       when left_frames > 0 or left_frames < 0 do
    mat = SSDMobileNetV2.predict_and_show(net, out_names, labels, frame)

    Cv.imencode(".png", mat)
    |> Kino.Image.new(:png)
    |> then(&Kino.Frame.render(widget, &1))

    frame_read = Cv.VideoCapture.read(video)

    if left_frames > 0 do
      _detect(net, out_names, labels, frame_read, video, widget, left_frames - 1)
    else
      _detect(net, out_names, labels, frame_read, video, widget, left_frames)
    end
  end
end
{:module, VideoDetection, <<70, 79, 82, 49, 0, 0, 12, ...>>, {:_detect, 7}}

run-the-detection

Run the detection

widget = Kino.Frame.new() |> Kino.render()
# read 1,800 frames at most
# change the number to negative values to
# detect objects until OpenCV cannot read
# new frame from the video stream
VideoDetection.detect(video, widget, 1800)
"Inference time=>48 ms"