Tutorial View Source

Run in Livebook

Your first pipeline

Mix.install([{:opus, "~> 0.8"}, {:kino, "~> 0.5"}])

Below you'll see a simple pipeline with two steps. It takes a number, adds 1 then multiplies by 2. When a module is made a pipeline with use Opus.Pipeline it can be called with a call/1 function.

So our module below can be called with:

ArithmeticPipeline.call(number)
defmodule ArithmeticPipeline do
  use Opus.Pipeline

  step(:add_one, with: &(&1 + 1))
  step(:multiply_by_two)

  def multiply_by_two(n), do: n * 2
end
input = Kino.Input.number("number", default: 0)
number = Kino.Input.read(input)

ArithmeticPipeline.call(number)

Error Handling

So far we've only defined step stages.

This stage processes the input value and with a success value the next stage is called with that value. With an error value the pipeline is halted and an {:error, any} is returned.

defmodule ArithmeticPipelineWithErrors do
  use Opus.Pipeline

  step(:add_one, with: &(&1 + 1))
  step(:multiply_by_two)
  step(:add_three, with: &(&1 + 3))

  def multiply_by_two(n) when n < 42, do: n * 2
  def multiply_by_two(_), do: {:error, "I only handle numbers < 42"}
end

Try this out with a number > 42 and you should see an Opus.PipelineError. As you can see when a step function returns an error tuple like {:error, "I only handle numbers < 42"} the pipeline is halted an the next stages are not executed.

input2 = Kino.Input.number("input2", default: 43)
number = Kino.Input.read(input2)

ArithmeticPipelineWithErrors.call(number)

Validating Input

defmodule Validator do
  use Opus.Pipeline

  check(:valid_user, with: &match?(%{user: %{id: id}} when is_integer(id), &1))

  check(:even_user, with: &(rem(&1.user.id, 2) == 0), error_message: "User should have an even id")
end
input3 = Kino.Input.number("input3", default: 3)
number = Kino.Input.read(input3)

Validator.call(%{user: %{id: number}})

The error message to return when a check fails is configurable. It can be an atom, string or a function.

defmodule ValidatorWithError do
  use Opus.Pipeline

  check(:valid_user, with: &match?(%{user: %{id: id}} when is_integer(id), &1))

  check(:even_user,
    with: &(rem(&1.user.id, 2) == 0),
    error_message: fn %{user: %{id: id}} ->
      "Oh the user should have an even id, #{id} is not even"
    end
  )
end
ValidatorWithError.call(%{user: %{id: 5}})

Side-effects with the tee stage

You can use the tee macro for side-effects. The return value in such stages is ignored.

defmodule ArithmeticSideEffectsPipeline do
  use Opus.Pipeline

  step(:add_one, with: &(&1 + 1))

  tee(:print_number,
    with: fn n ->
      IO.puts("The given number is.. #{n}")
      # The following error will be ignored
      raise "error"
    end
  )

  step(:multiply_by_two)

  def multiply_by_two(n), do: n * 2
end

Notice how raising an error does not halt the pipeline with tee.

input4 = Kino.Input.number("input4", default: 5)
number = Kino.Input.read(input4)

ArithmeticSideEffectsPipeline.call(number)

Linking Pipelines

Pipelines can call other pipelines and there's the link macro to make that easier. link is essentially a step where the linked pipeline is called with the step function argument and the step returns the return value of the linked pipeline.

input5 = Kino.Input.text("input4", default: "5")
input6 = Kino.Input.text("input4", default: "6")
defmodule ReadFirstInput do
  use Opus.Pipeline

  step(:read, with: &put_in(&1[:a], Kino.Input.read(&1[:input_a])))
end

defmodule ReadSecondInput do
  use Opus.Pipeline

  step(:read, with: &put_in(&1[:b], Kino.Input.read(&1[:input_b])))
end

defmodule Calculator do
  use Opus.Pipeline

  link(ReadFirstInput)
  link(ReadSecondInput)
  step(:parse)
  step(:add, with: &(&1.a + &1.b), if: &match?(%{operation: :add}, &1))
  step(:multiply, with: &(&1.a * &1.b), if: &match?(%{operation: :multiply}, &1))

  def parse(%{a: a, b: b} = calculation) do
    {a, _} = Integer.parse(a)
    {b, _} = Integer.parse(b)

    %{calculation | a: a, b: b}
  end
end

Notice how we leverage the if option to calculate based on the given operation. Both if and unless can be used to make a stage optional.

Calculator.call(%{operation: :add, input_a: input5, input_b: input6})
Calculator.call(%{operation: :multiply, input_a: input5, input_b: input6})