View Source Toolbox.Workflow (toolbox v5.4.10)

Workflow is an abstraction of a state machine designed to use with Altworx scenarios.

Overview

A workflow essentially describes a “blueprint” for a state machine. It specifies what possible states of the machine are, when does it move from one state to another, and what happens when such a transition is performed.

A concrete state machine that works according to the blueprint is called a workflow instance and is represented by the Toolbox.Workflow.Instance struct. It specifies what the current status and state are, what statuses it has gone through so far, and so on. See Toolbox.Workflow.Instance.t/0 for more information.

Note that what is normally referred to as states are in workflows called statuses. Statuses are from a finite set and are given in advance (e.g., a CD player can have three statuses: empty, idle, and playing). State, on the other hand, is an additional context enriching status with other information. State is not listed upfront and is not limited in size. For example, state of a CD player can be the artist and album of the CD that is currently playing.

In a workflow, statuses are defined by declared transitions. That is, the set of statuses is defined by all the statuses that are mentioned in the definition of transitions. Statuses are therefore a part of the workflow definition and cannot be changed later. State is defined when a new workflow instance is created. State can then be modified during the life-cycle of the instance using then callbacks.

Workflow instances consume messages received by scenarios and produce output actions. They are used from inside scenario units which feed them with messages, return produced output actions, and store their state (i.e., their instance struct) in their own state. As units are event-driven and can only react to messages, so are workflow instances and the only time when a transition may be performed (or when a workflow instance may be created) is when a message is received.

Most of the time, workflows are used via their wrapper for incident workflows (Toolbox.Incident). However, they can be used on their own as well.

Example

Here is a simple workflow for a CD player that has statuses and transitions as defined in the diagram below and that creates an event when a CD finishes. It operates on state that can be described with the following type: %{cd: %{album: String.t(), artist: String.t()} | nil}.

stateDiagram-v2
  direction LR
  empty --> idle: insert CD
  idle --> playing: play
  playing --> idle: stop
  playing --> idle: end of CD
  playing --> empty: eject
  idle --> empty: eject
defmodule CdPlayerWorkflow do
  alias Toolbox.Workflow
  alias Runbox.Message
  alias Runbox.Scenario.OutputAction, as: OA

  @cd_length :timer.minutes(74)

  def definition do
    Workflow.new()
    |> Workflow.add_transition(from: "empty", to: "idle", when: {__MODULE__, :insert_cd_msg},
                               then: {__MODULE__, :put_cd_to_state})
    |> Workflow.add_transition(from: "idle", to: "playing", when: {__MODULE__, :play_msg})
    |> Workflow.add_transition(from: "playing", to: "idle", when: {__MODULE__, :stop_msg})
    |> Workflow.add_transition(from: "playing", to: "idle", when: {:timeout, @cd_length},
                               side_effects: {__MODULE__, :create_played_event})
    |> Workflow.add_transition(from: "playing", to: "empty", when: {__MODULE__, :eject_msg},
                               then: {__MODULE__, :remove_cd_from_state})
    |> Workflow.add_transition(from: "idle", to: "empty", when: {__MODULE__, :eject_msg},
                               then: {__MODULE__, :remove_cd_from_state})
    |> Workflow.build()
  end

  def insert_cd_msg(_tran, _inst, %Message{type: type}), do: type == :insert_cd
  def eject_msg(_tran, _inst, %Message{type: type}), do: type == :eject
  def play_msg(_tran, _inst, %Message{type: type}), do: type == :play
  def stop_msg(_tran, _inst, %Message{type: type}), do: type == :stop

  def put_cd_to_state(_tran, inst, %Message{type: :insert_cd, body: body}) do
    cd = %{album: body.album, artist: body.artist}
    state = %{inst.state | cd: cd}
    {:ok, state}
  end

  def remove_cd_from_state(_tran, inst, _msg) do
    state = %{inst.state | cd: nil}
    {:ok, state}
  end

  def create_played_event(_tran, inst, msg) do
    oa = %OA.Event{
      type: "played_cd",
      template: "${actors.player} played ${params.album} by ${params.artist}",
      actors: %{"player" => %{asset_type: inst.type, asset_id: inst.id, name: nil}},
      params: %{"album" => inst.state.cd.album, "artist" => inst.state.cd.artist},
      origin_messages: [msg.origin]
    }
    {:ok, [oa]}
  end
end

An instance of this workflow may be then created like this:

{:ok, wf} = CdPlayerWorkflow.definition()
{:ok, oas, inst} = Workflow.new_instance(wf, "empty", "cd_player", "Sony D-50", %{cd: nil}, msg)

Afterwards, when a message comes to your unit, you can then call handle_message/3 to process the message and get an updated instance together with a list of produced output actions:

{:ok, oas, inst} = Workflow.handle_message(wf, inst, msg)

Summary

Functions

Defines a new transition in a given workflow.

Finishes workflow definition, validates all configured dependencies and workflow structure.

Uses given workflow definition and message to update the status and state of a given instance.

Creates new blank workflow definition.

Creates a new instance for a given workflow.

Types

@type status() :: String.t()
@type t() :: %Toolbox.Workflow{
  built?: boolean(),
  statuses: [status()],
  terminal_statuses: [status()],
  transitions: transitions()
}
@type transitions() :: %{required(status()) => [Toolbox.Workflow.Transition.t()]}

Functions

Link to this function

add_transition(wf, params)

View Source
@spec add_transition(t(), Keyword.t()) :: t()

Defines a new transition in a given workflow.

The transition is defined using a keyword list with the following keys:

  • from - source status
    • required parameter
  • to - target status
    • required parameter
  • when - predicate used to select transition which will be executed
    • optional parameter
    • possible when definitions:
      • {Module, function}, where function accepts transition, instance and message as args, and returns a boolean.
      • {:timeout, timeout}, where timeout is defined in milliseconds
      • {:=, [path, to, state, key], value}
      • {:>, [path, to, state, key], value}
      • {:<, [path, to, state, key], value}
      • {:<=, [path, to, state, key], value}
      • {:>=, [path, to, state, key], value}
      • {:contains, [path, to, state, key], value}
      • {:is_in, [path, to, state, key], list_value}
      • a list of predicates where each has one of the shapes above
        • e.g., [{__MODULE__, :play_msg}, {:=, [:cd, :artist], "Pink Floyd"}]
        • all predicates in that list must hold for the combined predicate to hold
  • then - callback used to update workflow instance state during transition execution
    • optional parameter
    • possible then definitions:
      • {Module, function}, where function accepts transition, instance and message as args
      • a list of {Module, functions} items
        • listed callbacks are executed in the given order
    • calbacks should return {:ok, wf_instance_state :: map()} or {:error, reason}
  • side_effects - callback used to generate output actions during transition execution
    • optional parameter
    • possible side_effects definitions:
      • {Module, function}, where function accepts transition, instance and message as args
      • a list of {Module, functions} items
        • listed callbacks are executed in the given order
    • calbacks should return {:ok, [Runbox.Scenario.OutputAction.oa_params()]} or {:error, reason}
  • update_history_entry - callback used to modify transition execution history entry
    • optional parameter
    • possible update_history_entry definitions:
      • {Module, function}, where function accepts history entry, transition, instance and message as args
      • a list of {Module, functions} items
        • listed callbacks are executed in the given order
    • calbacks should return {:ok, updated_history_entry :: map()} or {:error, reason}
  • update_possible_transition - callback used in handle_message/3 to modify possible transition
    • optional parameter
    • possible update_possible_transition definitions:
      • {Module, function}, where function accepts possible transition, transition, instance and message as args
      • a list of {Module, functions} items
        • listed callbacks are executed in the given order
    • calbacks should return {:ok, updated_possible_transition :: map()} or {:error, reason}
@spec build(t()) ::
  {:ok, t()}
  | {:error, :transition_from_required}
  | {:error, :transition_to_required}
  | {:error, {:bad_callback, {atom(), atom()}}}
  | {:error, :multiple_init_statuses}

Finishes workflow definition, validates all configured dependencies and workflow structure.

Link to this function

handle_message(wf, wf_inst, msg)

View Source
@spec handle_message(t(), Toolbox.Workflow.Instance.t(), Runbox.Message.t()) ::
  {:ok, [Runbox.Scenario.OutputAction.oa_params()],
   Toolbox.Workflow.Instance.t()}
  | {:terminated, [Runbox.Scenario.OutputAction.oa_params()],
     Toolbox.Workflow.Instance.t()}
  | {:error, :not_built_yet}
  | {:error, :status_mismatch}

Uses given workflow definition and message to update the status and state of a given instance.

If no configured workflow transition matches, nothing will happen, i.e., instance status and state will remain the same.

Order of callback execution:

  1. when definitions of transitions in definition order
  2. then definitions of matching transition
  3. update_history_entry definitions of matching transition
  4. update_possible_transition definitions of matching transition
  5. side_effects definitions of matching transition
@spec new() :: t()

Creates new blank workflow definition.

Continue with calls to add_transition/2 and build/1 to define a workflow.

Link to this function

new_instance(wf, status, type, id, state, msg, options \\ [])

View Source
@spec new_instance(
  t(),
  status(),
  String.t(),
  String.t(),
  map(),
  Runbox.Message.t(),
  Keyword.t()
) ::
  {:ok, [Runbox.Scenario.OutputAction.oa_params()],
   Toolbox.Workflow.Instance.t()}
  | {:terminated, [Runbox.Scenario.OutputAction.oa_params()],
     Toolbox.Workflow.Instance.t()}
  | {:error, :unknown_status}

Creates a new instance for a given workflow.

status is the initial status of the instance. state is its initial state.

type and id may be arbitrary. If your instance represents an asset or incident, you may use its type and id. These two fields are kept in the Toolbox.Workflow.Instance struct and are available in all workflow callbacks, so you can use them e.g. to construct output actions. They don't directly influence the computation, they're just additional information available to the user.

msg is the message that causes this instance to be created. Its timestamp is used as the first timestamp in the instance history.

When a new instance is created, it internally performs a transition from the internal status :none to the given status. You may pass transition callbacks using options (in the same way as to add_transition/2) and these callbacks will be executed on this initial transition. All the callbacks except for when are supported.

The creation of an instance automatically adds the first entry to the instance history and sets the last_update field to the timestamp of msg.

Example

iex> Workflow.new_instance(wf, "empty", "cd_player", "Sony D-50", %{cd: nil},
...>                       %Message{type: :discover_player, timestamp: 100})
{:ok, [],
  %Toolbox.Workflow.Instance{
    id: "Sony D-50",
    type: "cd_player",
    state: %{cd: nil},
    last_update: 100,
    status: "empty",
    history: [%{"status" => "empty", "timestamp" => 100}],
    possible_transitions: [%{"status" => "idle", "timestamp" => -1}],
    next_possible_transition_timestamp: nil,
    terminated?: false
}}