Multi-Step Actions
View SourceActions in Ash allow you to create sophisticated workflows that coordinate multiple changes or processes. Often business logic crosses multiple resources, and we often want it to be transactional. By leveraging action lifecycle hooks, you can build powerful domain-specific operations. This guide will explore how to build and use multi-step actions using a helpdesk example.
For most use cases, hooks are the preferred approach due to their simplicity and tight integration with Ash's action lifecycle. Reactor is the comprehensive solution for truly complex orchestration scenarios. Additionally, you can write generic actions by hand, implementing an action with fully custom code. Reactors can be used as the run
function for generic actions, giving them first class support in Ash extensions. See below for an example.
When to use hooks vs reactors vs generic actions
You should use hooks for most multi-step workflow scenarios as they provide simplicity and leverage Ash's transactional nature. The key decision point is whether you need compensation/rollback across external services:
Use hooks when:
- Coordinating changes within Ash resources (leverages database transactions)
- Performing side effects that don't require rollback (logging, notifications)
- Working with external services that don't need compensation logic
- Building small-to-medium complexity workflows
Use reactor when:
- You need to compensate/undo changes across multiple external services
- Building complex workflows that require sophisticated error handling and rollback logic
- Coordinating long-running processes that span multiple systems
Use generic actions when:
- You need a high-level action that works on multiple resources, and reactor or hooks are not fitting
- There aren't side effects or external servies
- Short transactional operations that can be understood at a glance
Durable Workflows
For durable workflows, we suggest to use Oban. We provide tools to integrate with Oban in AshOban. AshOban supports very specific types of common workflows, like "triggers" that run periodically for resources, and "scheduled actions" which run generic actions on a cron. You should not be afraid to write "standard" Oban jobs and code where possible. Don't bend over backwards trying to fit everything into AshOban.
Action Lifecycle Hooks
At the core of Ash's multi-step action capability are action lifecycle hooks. These hooks allow you to run code at specific points during an action's execution:
before_transaction: Runs before the transaction is started. Useful for operations that should happen before the transaction, like external API calls.
before_action: Runs in the transaction, before the data layer is called. Perfect for side effects and expensive logic. This hook can be used with changesets and queries.
after_action: Runs in the transaction, after the data layer is called, only if the action is successful. Ideal for transactional side effects that should only happen on success. This hook can be used with changesets and queries.
after_transaction: Runs after the transaction completes, in both success and error cases. Ideal for operations that should happen regardless of the transaction outcome, and for operations that work with external services.
There are other hooks that we won't go into here, as they are rarely used. See the documentation in Ash.Changeset
for more.
- around_action: Runs code both before and after the action logic, within the transaction.
- around_transaction: Runs code both before and after the transaction, outside the transaction.
Examples
Let's explore multi-step actions through a series of increasingly complex examples using a helpdesk system. Each example builds on concepts from the previous ones.
Example 1: Simple Activity Logging
The simplest multi-step action uses a single hook to perform a transactional effect. Here's a basic example that logs ticket creation by inserting an activity log.
defmodule HelpDesk.Changes.LogActivity do
use Ash.Resource.Change
@impl true
def change(changeset, _opts, _context) do
# Log activity after the ticket is successfully created
Ash.Changeset.after_action(changeset, fn _changeset, ticket ->
HelpDesk.ActivityLog.log("Ticket #{ticket.id} created: #{ticket.title}")
{:ok, ticket}
end)
end
end
Use it in your Ticket resource:
actions do
create :create do
accept [:title, :description]
change HelpDesk.Changes.LogActivity
end
end
Example 2: Multi-Hook Ticket Assignment
Building on the first example, let's add ticket assignment logic that uses multiple hooks to coordinate a transactional workflow:
defmodule HelpDesk.Changes.AssignTicket do
use Ash.Resource.Change
@impl true
def change(changeset, _opts, _context) do
changeset
|> Ash.Changeset.before_action(&find_and_assign_agent/1)
|> Ash.Changeset.after_action(¬ify_assignment/2)
end
defp find_and_assign_agent(changeset) do
case HelpDesk.AgentManager.find_available_agent() do
{:ok, agent} ->
changeset
|> Ash.Changeset.force_change_attribute(:agent_id, agent.id)
|> Ash.Changeset.force_change_attribute(:status, "assigned")
|> Ash.Changeset.put_context(:assigned_agent, agent)
{:error, reason} ->
Ash.Changeset.add_error(changeset, "No agents available: #{reason}")
end
end
defp notify_assignment(changeset, ticket) do
HelpDesk.Notifications.notify_assignment(changeset.context[:agent], ticket)
{:ok, ticket}
end
end
Example 3: Complex Workflow with External Services
This example shows a sophisticated workflow that interacts with external services and handles various error conditions:
defmodule HelpDesk.Changes.ProcessUrgentTicket do
use Ash.Resource.Change
@impl true
def change(changeset, _opts, _context) do
changeset
# uses before_transaction as it communicates with an external service
# and we don't want to keep a transaction longer than necessary
|> Ash.Changeset.before_transaction(&validate_external_services/1)
# Prepare for processing transactionally
|> Ash.Changeset.before_action(&prepare_urgent_processing/1)
# Complete the workflow transactionally
|> Ash.Changeset.after_action(&complete_urgent_workflow/2)
# Perform success or failure logic after the transaction
|> Ash.Changeset.after_transaction(&cleanup_and_notify/2)
end
defp validate_external_services(changeset) do
# Check external services before starting transaction
case HelpDesk.ExternalServices.health_check() do
:ok ->
changeset
{:error, service} ->
Ash.Changeset.add_error(changeset,
message: "External service #{service} unavailable for urgent processing")
end
end
defp prepare_urgent_processing(changeset) do
priority = Ash.Changeset.get_attribute(changeset, :priority)
if priority == "urgent" do
# Reserve resources for urgent processing
case HelpDesk.ResourceManager.reserve_urgent_slot() do
{:ok, slot_id} ->
changeset
|> Ash.Changeset.force_change_attribute(:status, "urgent_processing")
|> Ash.Changeset.force_change_attribute(:processing_slot_id, slot_id)
|> Ash.Changeset.put_context(:reserved_slot, slot_id)
{:error, :no_slots_available} ->
# Fallback to normal priority with notification
changeset
|> Ash.Changeset.force_change_attribute(:priority, "high")
|> Ash.Changeset.put_context(:priority_downgraded, true)
end
else
changeset
end
end
defp complete_urgent_workflow(_changeset, ticket) do
if ticket.status == "urgent_processing" do
# Create escalation path and update external tracking
with {:ok, _escalation} <- create_escalation_path(ticket),
{:ok, _external_ref} <- HelpDesk.ExternalServices.create_urgent_case(ticket) do
# Update ticket with external reference
ticket
|> Ash.Changeset.for_update(:add_external_reference, %{
external_ref: "URG-#{ticket.id}-#{System.system_time(:second)}"
})
|> HelpDesk.Ticket.update!()
else
{:error, reason} ->
# Handle errors gracefully
HelpDesk.Logger.error("Failed to complete urgent workflow for ticket #{ticket.id}: #{inspect(reason)}")
{:ok, ticket}
end
end
{:ok, ticket}
end
defp cleanup_and_notify({:ok, ticket}, changeset) do
# Success case - perform cleanup and notifications
# Any more complex cleanup than this should be
# pushing you to consider reactor and/or oban
if slot_id = Ash.Changeset.get_context(changeset, :reserved_slot) do
HelpDesk.ResourceManager.release_slot(slot_id)
end
if Ash.Changeset.get_context(changeset, :priority_downgraded) do
HelpDesk.Notifications.notify_priority_downgrade(ticket)
end
HelpDesk.Metrics.increment_urgent_tickets()
{:ok, ticket}
end
defp cleanup_and_notify({:error, _reason} = error, changeset) do
# Error case - clean up resources
if slot_id = Ash.Changeset.get_context(changeset, :reserved_slot) do
HelpDesk.ResourceManager.release_slot(slot_id)
end
error
end
defp create_escalation_path(ticket) do
HelpDesk.Escalation
|> Ash.Changeset.for_create(:create, %{
ticket_id: ticket.id,
level: 1,
escalated_at: DateTime.utc_now()
})
|> HelpDesk.Escalation.create()
end
end
These examples demonstrate the progression from simple logging to complex cross-resource workflows. Each example builds on the previous concepts while introducing new patterns and considerations for multi-step actions.
Shortcuts for hooks
There are multiple ways to add hooks to actions. What we showed above, defining an Ash.Resource.Change
module, and using the functions in Ash.Changeset
to add callbacks to the module is the most organized & idiomatic way. However, you can also use the following techniques as shorthand. In general, prefer to write your changes into modules to keep your resources clean and keep compile times down (more smaller modules is typically better than fewer larger modules).
Anonymous Function Changes
Here we use an anonymous function change, which is a shorthand for defining a change module with a single function. Just like in the change/3
function, you can add hooks to the changeset.
create :open_and_assign do
change fn changeset, context ->
Ash.Changeset.after_action(changeset, fn changeset, result ->
...
end)
end
end
Builtin Hook Changes
If you know that you just want to add a single hook to an action, you can use some of the functions in Ash.Resource.Change.Builtins
, which are simple shorthands for the above form. For example:
create :open_and_assign do
change after_action(changeset, result, context ->
...
end)
end
Notice how the anonymous function takes an extra argument. In the first format above, the context
value came from change fn changeset, context ->
, but in this format, it is provided as an argument at the end of the builtin function, i.e change after_action(changeset, result, context -> .
Batch Callbacks
When working with bulk actions (like Ash.bulk_create/3
, Ash.bulk_update/3
, etc.), you can optimize your changes by implementing batch callbacks. These allow you to process multiple changesets together, which can be much more efficient than processing them individually.
Understanding Batch Callbacks
Batch callbacks are optional methods you can implement in your change modules:
batch_change/3
: Replaceschange/3
for batch operations, receiving a list of changesetsbefore_batch/3
: Runs before the batch is sent to the data layerafter_batch/3
: Runs after the batch completes, receiving changeset-result pairs
Batch Operations and Transactions
The batch logic is currently all within transactions. There is no before_batch_transaction
or after_batch_transaction
hooks yet.
When are batch callbacks used?
batch_change/3
must be defined forbefore_batch/3
andafter_batch/3
to be called- The exception is
after_batch/3
, which is also called after atomic changes whenatomic/3
is defined before_batch/3
is ignored when calling changes atomically- If you define
batch_change/3
, you can omitchange/3
entirely
Example: Batch Processing with External API
Here's an example that demonstrates efficient batch processing for ticket creation with external service integration:
defmodule HelpDesk.Changes.BatchNotifyExternalSystem do
use Ash.Resource.Change
@impl true
def batch_change(changesets, _opts, _context) do
# Process all changesets together - you can do expensive setup here
# that would be wasteful to repeat for each individual changeset
Enum.map(changesets, &prepare_for_external_notification/1)
end
@impl true
def before_batch(changesets, opts, context) do
# Validate external service availability before processing the batch
case HelpDesk.ExternalAPI.health_check() do
:ok ->
# Mark all changesets as ready for external notification
Enum.map(changesets, fn changeset ->
Ash.Changeset.put_context(changeset, :external_api_ready, true)
end)
{:error, reason} ->
# Add errors to all changesets if external service is down
Enum.map(changesets, fn changeset ->
Ash.Changeset.add_error(changeset,
message: "External API unavailable: #{reason}")
end)
end
end
@impl true
def after_batch(changesets_and_results, opts, context) do
# Efficiently batch notify external system
notifications =
changesets_and_results
|> Enum.filter(fn {changeset, _result} ->
Ash.Changeset.get_context(changeset, :external_api_ready, false)
end)
|> Enum.map(fn {_changeset, result} ->
%{
ticket_id: result.id,
title: result.title,
created_at: result.inserted_at
}
end)
# Single API call for entire batch instead of one per ticket
case HelpDesk.ExternalAPI.batch_notify_tickets(notifications) do
{:ok, _response} ->
# Return :ok to indicate success
:ok
{:error, error} ->
# You can return individual errors or notifications
Enum.map(changesets_and_results, fn {_changeset, result} ->
{:error, Ash.Error.Invalid.exception(
message: "Failed to notify external system for ticket #{result.id}: #{error}"
)}
end)
end
end
defp prepare_for_external_notification(changeset) do
# Add any metadata needed for external notification
changeset
|> Ash.Changeset.put_context(:needs_external_notification, true)
end
end
Example: Optimized Database Operations
This example shows how to use batch callbacks to optimize database operations:
defmodule HelpDesk.Changes.BatchAssignAgents do
use Ash.Resource.Change
@impl true
def before_batch(changesets, _opts, _context) do
# Pre-load all available agents once for the entire batch
available_agents =
HelpDesk.Agent
|> Ash.Query.filter(status == "available")
|> Ash.Query.sort(:workload)
|> Ash.read!()
# Distribute agents across the batch
{assigned_changesets, _remaining_agents} =
Enum.map_reduce(changesets, available_agents, fn changeset, [agent | rest] ->
updated_changeset =
changeset
|> Ash.Changeset.change_attribute(:agent_id, agent.id)
|> Ash.Changeset.change_attribute(:status, "assigned")
|> Ash.Changeset.put_context(:assigned_agent, agent)
{updated_changeset, rest ++ [%{agent | workload: agent.workload + 1}]}
end)
assigned_changesets
end
@impl true
def after_batch(changesets_and_results, _opts, _context) do
# Batch update agent workloads
agent_updates =
changesets_and_results
|> Enum.map(fn {changeset, _result} ->
Ash.Changeset.get_context(changeset, :assigned_agent)
end)
|> Enum.filter(& &1)
|> Enum.group_by(& &1.id)
|> Enum.map(fn {agent_id, assignments} ->
%{id: agent_id, workload_increment: length(assignments)}
end)
# Single bulk operation to update all agent workloads
HelpDesk.Agent
|> Ash.bulk_update(:increment_workload, agent_updates)
:ok
end
# Define batch_change/3 to enable batch callbacks
@impl true
def batch_change(changesets, _opts, _context) do
# The actual changeset modifications happen in before_batch
# This just returns the changesets as-is
changesets
end
end
Example: Conditional Batch Processing
You can control when batch callbacks are used with the batch_callbacks?/3
callback:
defmodule HelpDesk.Changes.ConditionalBatchProcessing do
use Ash.Resource.Change
@impl true
def batch_callbacks?(changesets, opts, context) do
# Only use batch processing for large batches
length(changesets) >= 10
end
@impl true
def change(changeset, opts, context) do
# This runs for individual changes or small batches
changeset
|> perform_individual_processing(opts, context)
end
@impl true
def batch_change(changesets, opts, context) do
# This runs for large batches (10+ items)
changesets
|> perform_optimized_batch_processing(opts, context)
end
defp perform_individual_processing(changeset, _opts, _context) do
# Simple processing for individual items
changeset
end
defp perform_optimized_batch_processing(changesets, _opts, _context) do
# Optimized processing for large batches
changesets
end
end
Best Practices for Batch Callbacks
- Use batch callbacks for expensive operations: Database queries, external API calls, file I/O
- Keep individual operations fast: If your change is already fast, batch callbacks may not be worth the complexity
- Handle errors gracefully: Return appropriate error tuples from
after_batch/3
when things go wrong - Test both paths: Ensure your change works correctly both individually and in batches
Generic Action Example
# Define a plain-old elixir module/function to express the action
defmodule HelpDesk.Actions.AssignTicket do
def run(input, context) do
with {:ok, agent} <- HelpDesk.AgentManager.find_available_agent(),
{:ok, ticket} <- HelpDesk.get_ticket_by_id(input.arguments.ticket_id),
{:ok, ticket} <- HelpDesk.update_ticket(ticket, %{agent_id: agent.id, status: :assigned}, actor: input.actor)
:ok <- Helpdesk.Notifications.notify_assignment(agent, ticket)
end
end
end
# Invoke the action from Resource
actions do
action :assign_to_available_agent do
transaction? true
argument :ticket_id, :uuid
run HelpDesk.Actions.AssignTicket
end
end