View Source Toolbox.Workflow (toolbox v5.4.10)
Workflow is an abstraction of a state machine designed to use with Altworx scenarios.
Overview
A workflow essentially describes a “blueprint” for a state machine. It specifies what possible states of the machine are, when does it move from one state to another, and what happens when such a transition is performed.
A concrete state machine that works according to the blueprint is called a workflow
instance and is represented by the Toolbox.Workflow.Instance
struct. It specifies
what the current status and state are, what statuses it has gone through so far, and
so on. See Toolbox.Workflow.Instance.t/0
for more information.
Note that what is normally referred to as states are in workflows called statuses.
Statuses are from a finite set and are given in advance (e.g., a CD player can have
three statuses: empty
, idle
, and playing
). State, on the other hand, is an
additional context enriching status with other information. State is not listed upfront
and is not limited in size. For example, state of a CD player can be the artist and
album of the CD that is currently playing.
In a workflow, statuses are defined by declared transitions. That is, the set of statuses
is defined by all the statuses that are mentioned in the definition of transitions.
Statuses are therefore a part of the workflow definition and cannot be changed later.
State is defined when a new workflow instance is created. State can then be modified
during the life-cycle of the instance using then
callbacks.
Workflow instances consume messages received by scenarios and produce output actions. They are used from inside scenario units which feed them with messages, return produced output actions, and store their state (i.e., their instance struct) in their own state. As units are event-driven and can only react to messages, so are workflow instances and the only time when a transition may be performed (or when a workflow instance may be created) is when a message is received.
Most of the time, workflows are used via their wrapper for incident workflows
(Toolbox.Incident
). However, they can be used on their own as well.
Example
Here is a simple workflow for a CD player that has statuses and transitions as defined
in the diagram below and that creates an event when a CD finishes.
It operates on state that can be described with the following type:
%{cd: %{album: String.t(), artist: String.t()} | nil}
.
stateDiagram-v2
direction LR
empty --> idle: insert CD
idle --> playing: play
playing --> idle: stop
playing --> idle: end of CD
playing --> empty: eject
idle --> empty: eject
defmodule CdPlayerWorkflow do
alias Toolbox.Workflow
alias Runbox.Message
alias Runbox.Scenario.OutputAction, as: OA
@cd_length :timer.minutes(74)
def definition do
Workflow.new()
|> Workflow.add_transition(from: "empty", to: "idle", when: {__MODULE__, :insert_cd_msg},
then: {__MODULE__, :put_cd_to_state})
|> Workflow.add_transition(from: "idle", to: "playing", when: {__MODULE__, :play_msg})
|> Workflow.add_transition(from: "playing", to: "idle", when: {__MODULE__, :stop_msg})
|> Workflow.add_transition(from: "playing", to: "idle", when: {:timeout, @cd_length},
side_effects: {__MODULE__, :create_played_event})
|> Workflow.add_transition(from: "playing", to: "empty", when: {__MODULE__, :eject_msg},
then: {__MODULE__, :remove_cd_from_state})
|> Workflow.add_transition(from: "idle", to: "empty", when: {__MODULE__, :eject_msg},
then: {__MODULE__, :remove_cd_from_state})
|> Workflow.build()
end
def insert_cd_msg(_tran, _inst, %Message{type: type}), do: type == :insert_cd
def eject_msg(_tran, _inst, %Message{type: type}), do: type == :eject
def play_msg(_tran, _inst, %Message{type: type}), do: type == :play
def stop_msg(_tran, _inst, %Message{type: type}), do: type == :stop
def put_cd_to_state(_tran, inst, %Message{type: :insert_cd, body: body}) do
cd = %{album: body.album, artist: body.artist}
state = %{inst.state | cd: cd}
{:ok, state}
end
def remove_cd_from_state(_tran, inst, _msg) do
state = %{inst.state | cd: nil}
{:ok, state}
end
def create_played_event(_tran, inst, msg) do
oa = %OA.Event{
type: "played_cd",
template: "${actors.player} played ${params.album} by ${params.artist}",
actors: %{"player" => %{asset_type: inst.type, asset_id: inst.id, name: nil}},
params: %{"album" => inst.state.cd.album, "artist" => inst.state.cd.artist},
origin_messages: [msg.origin]
}
{:ok, [oa]}
end
end
An instance of this workflow may be then created like this:
{:ok, wf} = CdPlayerWorkflow.definition()
{:ok, oas, inst} = Workflow.new_instance(wf, "empty", "cd_player", "Sony D-50", %{cd: nil}, msg)
Afterwards, when a message comes to your unit, you can then call handle_message/3
to process
the message and get an updated instance together with a list of produced output actions:
{:ok, oas, inst} = Workflow.handle_message(wf, inst, msg)
Summary
Functions
Defines a new transition in a given workflow.
Finishes workflow definition, validates all configured dependencies and workflow structure.
Uses given workflow definition and message to update the status and state of a given instance.
Creates new blank workflow definition.
Creates a new instance for a given workflow.
Types
@type status() :: String.t()
@type t() :: %Toolbox.Workflow{ built?: boolean(), statuses: [status()], terminal_statuses: [status()], transitions: transitions() }
@type transitions() :: %{required(status()) => [Toolbox.Workflow.Transition.t()]}
Functions
Defines a new transition in a given workflow.
The transition is defined using a keyword list with the following keys:
from
- source status- required parameter
to
- target status- required parameter
when
- predicate used to select transition which will be executed- optional parameter
- possible
when
definitions:{Module, function}
, where function accepts transition, instance and message as args, and returns a boolean.{:timeout, timeout}
, where timeout is defined in milliseconds{:=, [path, to, state, key], value}
{:>, [path, to, state, key], value}
{:<, [path, to, state, key], value}
{:<=, [path, to, state, key], value}
{:>=, [path, to, state, key], value}
{:contains, [path, to, state, key], value}
{:is_in, [path, to, state, key], list_value}
- a list of predicates where each has one of the shapes above
- e.g.,
[{__MODULE__, :play_msg}, {:=, [:cd, :artist], "Pink Floyd"}]
- all predicates in that list must hold for the combined predicate to hold
- e.g.,
then
- callback used to update workflow instance state during transition execution- optional parameter
- possible then definitions:
{Module, function}
, where function accepts transition, instance and message as args- a list of
{Module, functions}
items- listed callbacks are executed in the given order
- calbacks should return
{:ok, wf_instance_state :: map()}
or{:error, reason}
side_effects
- callback used to generate output actions during transition execution- optional parameter
- possible
side_effects
definitions:{Module, function}
, where function accepts transition, instance and message as args- a list of
{Module, functions}
items- listed callbacks are executed in the given order
- calbacks should return
{:ok, [Runbox.Scenario.OutputAction.oa_params()]}
or{:error, reason}
update_history_entry
- callback used to modify transition execution history entry- optional parameter
- possible
update_history_entry
definitions:{Module, function}
, where function accepts history entry, transition, instance and message as args- a list of
{Module, functions}
items- listed callbacks are executed in the given order
- calbacks should return
{:ok, updated_history_entry :: map()}
or{:error, reason}
update_possible_transition
- callback used inhandle_message/3
to modify possible transition- optional parameter
- possible
update_possible_transition
definitions:{Module, function}
, where function accepts possible transition, transition, instance and message as args- a list of
{Module, functions}
items- listed callbacks are executed in the given order
- calbacks should return
{:ok, updated_possible_transition :: map()}
or{:error, reason}
@spec build(t()) :: {:ok, t()} | {:error, :transition_from_required} | {:error, :transition_to_required} | {:error, {:bad_callback, {atom(), atom()}}} | {:error, :multiple_init_statuses}
Finishes workflow definition, validates all configured dependencies and workflow structure.
@spec handle_message(t(), Toolbox.Workflow.Instance.t(), Runbox.Message.t()) :: {:ok, [Runbox.Scenario.OutputAction.oa_params()], Toolbox.Workflow.Instance.t()} | {:terminated, [Runbox.Scenario.OutputAction.oa_params()], Toolbox.Workflow.Instance.t()} | {:error, :not_built_yet} | {:error, :status_mismatch}
Uses given workflow definition and message to update the status and state of a given instance.
If no configured workflow transition matches, nothing will happen, i.e., instance status and state will remain the same.
Order of callback execution:
when
definitions of transitions in definition orderthen
definitions of matching transitionupdate_history_entry
definitions of matching transitionupdate_possible_transition
definitions of matching transitionside_effects
definitions of matching transition
@spec new() :: t()
Creates new blank workflow definition.
Continue with calls to add_transition/2
and build/1
to define a workflow.
@spec new_instance( t(), status(), String.t(), String.t(), map(), Runbox.Message.t(), Keyword.t() ) :: {:ok, [Runbox.Scenario.OutputAction.oa_params()], Toolbox.Workflow.Instance.t()} | {:terminated, [Runbox.Scenario.OutputAction.oa_params()], Toolbox.Workflow.Instance.t()} | {:error, :unknown_status}
Creates a new instance for a given workflow.
status
is the initial status of the instance. state
is its initial state.
type
and id
may be arbitrary. If your instance represents an asset or incident,
you may use its type and id. These two fields are kept in the Toolbox.Workflow.Instance
struct and are available in all workflow callbacks, so you can use them e.g. to construct
output actions. They don't directly influence the computation, they're just additional
information available to the user.
msg
is the message that causes this instance to be created. Its timestamp is used as
the first timestamp in the instance history.
When a new instance is created, it internally performs a transition from the internal
status :none
to the given status
. You may pass transition callbacks using options
(in the same way as to add_transition/2
) and these callbacks will be executed on this
initial transition. All the callbacks except for when
are supported.
The creation of an instance automatically adds the first entry to the instance history
and sets the last_update
field to the timestamp of msg
.
Example
iex> Workflow.new_instance(wf, "empty", "cd_player", "Sony D-50", %{cd: nil},
...> %Message{type: :discover_player, timestamp: 100})
{:ok, [],
%Toolbox.Workflow.Instance{
id: "Sony D-50",
type: "cd_player",
state: %{cd: nil},
last_update: 100,
status: "empty",
history: [%{"status" => "empty", "timestamp" => 100}],
possible_transitions: [%{"status" => "idle", "timestamp" => -1}],
next_possible_transition_timestamp: nil,
terminated?: false
}}