View Source Reactor (reactor v0.8.1)

Reactor is a dynamic, concurrent, dependency resolving saga orchestrator.

Usage

You can construct a reactor using the Reactor Spark DSL:

defmodule HelloWorldReactor do
  @moduledoc false
  use Reactor

  input :whom

  step :greet, Greeter do
    argument :whom, input(:whom)
  end

  return :greet
end
iex> Reactor.run(HelloWorldReactor, %{whom: "Dear Reader"})
{:ok, "Hello, Dear Reader!"}

or you can build it programmatically:

iex> reactor = Builder.new()
...> {:ok, reactor} = Builder.add_input(reactor, :whom)
...> {:ok, reactor} = Builder.add_step(reactor, :greet, Greeter, whom: {:input, :whom})
...> {:ok, reactor} = Builder.return(reactor, :greet)
...> Reactor.run(reactor, %{whom: nil})
{:ok, "Hello, World!"}

Options

  • :extensions (list of module that adopts Spark.Dsl.Extension) - A list of DSL extensions to add to the Spark.Dsl

  • :otp_app (atom/0) - The otp_app to use for any application configurable options

  • :fragments (list of module/0) - Fragments to include in the Elixir.Spark.Dsl. See the fragments guide for more.

Summary

Types

When set to false forces the Reactor to run every step synchronously, regardless of the step configuration.

Use a Reactor.Executor.ConcurrencyTracker.pool_key to allow this Reactor to share it's concurrency pool with other Reactor instances.

How long to wait for asynchronous steps to complete when halting.

Specify the maximum number of asynchronous steps which can be run in parallel.

The maximum number of iterations which after which the Reactor will halt.

t()

Specify the amount of execution time after which to halt processing.

Functions

A guard which returns true if the value is a Reactor struct

Types

@type async_option() :: {:async?, boolean()}

When set to false forces the Reactor to run every step synchronously, regardless of the step configuration.

Defaults to true.

Link to this type

concurrency_key_option()

View Source
@type concurrency_key_option() :: {:concurrency_key, reference()}

Use a Reactor.Executor.ConcurrencyTracker.pool_key to allow this Reactor to share it's concurrency pool with other Reactor instances.

If you do not specify one then the Reactor will initialise a new pool and place it in it's context for any child Reactors to re-use.

Only used if async? is set to true.

@type context() :: %{optional(atom()) => any()}
@type context_arg() :: Enumerable.t({atom(), any()})
@type halt_timeout_option() :: {:halt_timeout, pos_integer() | :infinity}

How long to wait for asynchronous steps to complete when halting.

Defaults to 5000ms.

@type inputs() :: %{optional(atom()) => any()}
Link to this type

max_concurrency_option()

View Source
@type max_concurrency_option() :: {:max_concurrency, pos_integer()}

Specify the maximum number of asynchronous steps which can be run in parallel.

Defaults to the result of System.schedulers_online/0. Only used if async? is set to true.

Link to this type

max_iterations_option()

View Source
@type max_iterations_option() :: {:max_iterations, pos_integer() | :infinity}

The maximum number of iterations which after which the Reactor will halt.

Defaults to :infinity.

@type state() :: :pending | :executing | :halted | :failed | :successful
@type t() :: %Reactor{
  context: context(),
  id: any(),
  inputs: [atom()],
  intermediate_results: %{required(any()) => any()},
  middleware: [Reactor.Middleware.t()],
  plan: nil | Graph.t(),
  return: any(),
  state: state(),
  steps: [Reactor.Step.t()],
  undo: [{Reactor.Step.t(), any()}]
}
@type timeout_option() :: {:timeout, pos_integer() | :infinity}

Specify the amount of execution time after which to halt processing.

Note that this is not a hard limit. The Reactor will stop when the first step completes after the timeout has expired.

Defaults to :infinity.

Functions

Link to this macro

is_reactor(reactor)

View Source (macro)
@spec is_reactor(any()) :: Macro.t()

A guard which returns true if the value is a Reactor struct

Link to this function

run(reactor, inputs \\ %{}, context \\ %{}, options \\ [])

View Source
@spec run(t() | module(), inputs(), context_arg(), options()) ::
  {:ok, any()} | {:error, any()} | {:halted, t()}

Attempt to run a Reactor.

Arguments

  • reactor - The Reactor to run, either a Reactor DSL module, or a Reactor struct.
  • inputs - A map of values passed in to satisfy the Reactor's expected inputs.
  • context - An arbitrary map that will be merged into the Reactor context and passed into each step.

Options

  • :max_concurrency (pos_integer/0) - The maximum number of processes to use to run the Reactor

  • :timeout - How long to allow the Reactor to run for The default value is :infinity.

  • :max_iterations - The maximum number of times to allow the Reactor to loop The default value is :infinity.

  • :async_option (boolean/0) - Whether to allow the Reactor to start processes The default value is true.

Link to this function

run!(reactor, inputs \\ %{}, context \\ %{}, options \\ [])

View Source
@spec run!(t() | module(), inputs(), context_arg(), options()) :: any() | no_return()

Raising version of run/4.