View Source Step Flow
Step flow manager for Elixir applications.
Usage
Add AMQP as a dependency in your mix.exs
file.
def deps do
[
{:step_flow, "~> 0.1.0"}
]
end
And add into extra applications:
def application do
[
mod: {MyApplication.Application, []},
extra_applications: [
:step_flow
]
]
end
Workflows and Jobs State Machines
The state machines are described in dedicated pages, for workflows and for jobs respectively.
Configurations to start
PostgreSQL configuration
Add into configuration files:
config :step_flow, Ecto.Repo,
hostname: "localhost",
username: "postgres",
password: "postgres",
database: "step_flow_dev",
runtime_pool_size: 10
It can also be used with environment variables:
config :step_flow, Ecto.Repo,
hostname: {:system, "DATABASE_HOSTNAME"},
username: {:system, "DATABASE_USERNAME"},
password: {:system, "DATABASE_PASSWORD"},
database: {:system, "DATABASE_NAME"},
runtime_pool_size: {:system, "DATABASE_POOL_SIZE"}
RabbitMQ configuration
Add into configuration files:
config :step_flow, StepFlow.Amqp,
hostname: "localhost",
port: 5672,
username: "guest",
password: "guest",
virtual_host: ""
To set TLS over AMQP, specific configuration fields can be added:
config :step_flow, StepFlow.Amqp,
hostname: "localhost",
port: 5672,
username: "guest",
password: "guest",
virtual_host: "",
scheme: "amqp",
ssl_cacertfile: '/path/to/ca/cacert.pem',
ssl_certfile: '/path/to/client/cert.pem',
ssl_keyfile: '/path/to/client/key.pem',
# only necessary with intermediate CAs
ssl_depth: 2,
ssl_verify: :verify_peer,
ssl_fail_if_no_peer_cert: true
It can also be used with environment variables:
config :step_flow, StepFlow.Amqp,
hostname: {:system, "RABBITMQ_HOSTNAME"},
port: {:system, "RABBITMQ_PORT"},
username: {:system, "RABBITMQ_USERNAME"},
password: {:system, "RABBITMQ_PASSWORD"},
virtual_host: {:system, "RABBITMQ_VIRTUAL_HOST"},
scheme: {:system, "RABBITMQ_SCHEME"},
ssl_cacertfile: {:system, "RABBITMQ_SSL_CACERT_FILE"},
ssl_certfile: {:system, "RABBITMQ_SSL_CERT_FILE"},
ssl_keyfile: {:system, "RABBITMQ_SSL_KEY_FILE"},
ssl_depth: {:system, "RABBITMQ_SSL_DEPTH"},
ssl_verify: {:system, "RABBITMQ_SSL_VERIFY"},
ssl_fail_if_no_peer_cert: {:system, "RABBITMQ_SSL_FAIL_IF_NO_PEER_CERT"},
server_configuration: {:system, "RABBITMQ_SERVER_CONFIGURATION"}
Slack integration (optional)
Configuring the Slack integration enable notification to Slack channel. It's used to:
- send notification when job is in error
- send a notification via a Step
To enable integration, update configuration files:
config :step_flow, StepFlow,
exposed_domain_name: {:system, "EXPOSED_DOMAIN_NAME"},
slack_api_token: {:system, "SLACK_API_TOKEN"},
slack_api_channel: {:system, "SLACK_API_CHANNEL"}
Remarks: the slack_api_channel
configure the channel for error jobs notification.
The message contains also a link to the workflow, so exposed_domain_name
configure the hostname of the hosted application.
Expose StepFlow REST API
Step Flow provides the REST API to manage Workflows and Jobs.
Create a module with:
defmodule MyApplicationWeb.StepFlow.Plug do
@moduledoc false
use StepFlow.Plug
end
Then in Phoenix router redirect the API part to the module:
defmodule MyApplicationWeb.Router do
use MyApplicationWeb, :router
pipeline :api do
plug(:accepts, ["json"])
end
scope "/api", MyApplicationWeb do
pipe_through(:api)
scope "/step_flow", StepFlow do
forward("/", Plug)
end
# add other routes here
end
end
To enable authentication checks over Step Flow API configure callbacks in configuration:
config :step_flow, StepFlow,
authorize: [
module: MyApplicationWeb.Authorize,
get_jobs: [:user_check, :right_technician_check],
get_workflows: [:user_check, :right_technician_check],
post_workflows: [:user_check, :right_technician_check],
put_workflows: [:user_check, :right_technician_check],
delete_workflows: [:user_check, :right_technician_check],
post_workflows_events: [:user_check, :right_technician_check],
get_definitions: [:user_check, :right_technician_check],
post_worker_definitions: [:user_check, :right_technician_check],
get_worker_definitions: [:user_check, :right_technician_check],
get_workflows_statistics: [:user_check]
]
Enable StepFlow live notification to WebSocket
StepFlow can be configured to send messages on job updates (to enable refresh of UI via WebSocket for example). To broadcast messages, the application endpoint needs to be configured.
config :step_flow, StepFlow,
endpoint: MyApplicationWeb.Endpoint
Configuration of the default working directory for workers
Each workers works with big data files, mostly on shared storage. The default access point need to be configured using:
config :step_flow, StepFlow,
workers_work_directory: "/data/mount/point"
Configuration of the default stepflow schema
A workflow follows a schema definition, this definition can be overloaded with:
config :step_flow, StepFlow.WorkflowDefinitions.ExternalLoader, specification_folder: "./mediacloudai/"
config :step_flow, StepFlow.WorkflowDefinitions.WorkflowDefinition,
workflow_schema_url: "./mediacloudai/standard/1.11/workflow-definition.schema.json"
Prometheus exporter
Several metrics are defined to monitor step flow:
- number of workflows per identifier created in the last 24 hours
- execution time of worflows per identifier of ended worflow in the last 24 hours
Metrics can be exported by:
config :step_flow, StepFlow,
enable_metrics: true # activate metrics
Metric exports can be formatted:
config :prometheus, StepFlow.Metrics.PrometheusExporter,
format: :auto, ## or :protobuf, or :text
Export endpoint can be optionally secured using HTTP Basic Authentication:
config :prometheus, StepFlow.Metrics.PrometheusExporter,
auth: {:basic, "username", "password"}
Metrics can be configured with
config :step_flow, StepFlow.Metrics,
scale: "day", # "year", "month", "week", "day", "hour", "minute", "second", "millisecond", and "microsecond"
delta: -1