Persistence & Data
View Source< Concurrency (Familiar Patterns) | Up: Foundational Effects | Index | External Integration >
The Transaction, Command, and EventAccumulator effects handle transactions, mutation dispatch, and domain event collection. They work together to support patterns from simple CRUD through to event-sourced domain logic.
Database persistence is handled via domain-specific
Port.Contract modules (e.g. UserRepo,
OrderRepo), which provide typed boundaries and swappable handlers.
For common Ecto Repo operations (insert, update, delete, get, etc.),
Skuld provides a built-in Port.Repo contract so you don't
need to redeclare identical boilerplate in every domain.
Transaction is orthogonal — it wraps any computation in transactional semantics, rolling back env state on failure with optional database transaction support.
For read queries and external service calls, see External Integration. For batchable queries with automatic N+1 prevention, see Query & Batching.
Port.Repo
A built-in Port contract providing standard Ecto Repo operations. Use it when your effectful code needs generic persistence (insert, update, delete, get, all, etc.) without defining a domain-specific contract for each operation.
Operations
Writes — return {:ok, struct()} | {:error, Ecto.Changeset.t()}
with auto-generated bang variants:
insert(changeset)/insert!(changeset)update(changeset)/update!(changeset)delete(record)/delete!(record)
Bulk — return {count, nil | list}:
update_all(queryable, updates, opts)delete_all(queryable, opts)
Reads — follow Ecto's conventions:
get(queryable, id)/get!(queryable, id)get_by(queryable, clauses)/get_by!(queryable, clauses)one(queryable)/one!(queryable)all(queryable)exists?(queryable)aggregate(queryable, aggregate, field)
Usage in computations
alias Skuld.Effects.Port.Repo
defcomp create_and_fetch(attrs) do
changeset = User.changeset(%User{}, attrs)
user <- Repo.insert!(changeset)
all_users <- Repo.all(User)
{user, all_users}
endProduction handler (Port.Repo.Ecto)
Generate a Behaviour implementation that delegates to your Ecto Repo:
defmodule MyApp.Repo.Ecto do
use Skuld.Effects.Port.Repo.Ecto, repo: MyApp.Repo
endWire it into the handler stack:
create_and_fetch(attrs)
|> Transaction.transact()
|> Transaction.Ecto.with_handler(MyApp.Repo)
|> Port.with_handler(%{Repo.Contract => MyApp.Repo.Ecto})
|> Throw.with_handler()
|> Comp.run!()Test handler (Port.Repo.Test)
A stateless test handler. Repo.Test.new/1 returns a fn resolver
that applies changeset changes for writes and delegates reads to an
optional fallback_fn (or raises). Register it via Port.with_handler
and use the :log option to capture a dispatch log. Each log entry is
a 4-tuple {module, operation, args_list, return_value}:
alias Skuld.Effects.Port
alias Skuld.Effects.Port.Repo
cs = User.changeset(%User{}, %{name: "Alice"})
alice = %User{id: 42, name: "Alice"}
{result, log} =
comp do
user <- Repo.insert!(cs)
found <- Repo.get(User, 42)
{user, found}
end
|> Port.with_handler(
%{Repo => Repo.Test.new(
fallback_fn: fn :get, [User, 42] -> alice end
)},
# Note: Repo.Test fallback is 2-arity (stateless).
# Repo.InMemory fallback is 3-arity (receives store state).
log: true,
output: fn result, state -> {result, state.log} end
)
|> Throw.with_handler()
|> Comp.run!()
assert {%User{name: "Alice"}, ^alice} = result
assert [
{Repo, :insert, [^cs], {:ok, %User{name: "Alice"}}},
{Repo, :get, [User, 42], ^alice}
] = logBecause logging happens at the Port level, the log captures all
Port dispatches — not just Port.Repo operations. Register
additional contracts in the same registry map and their dispatches
appear in the same log:
Port.with_handler(
comp,
%{Repo => Repo.Test.new(), MyApp.Queries => MyApp.Queries.TestImpl},
log: true,
output: fn r, state -> {r, state.log} end
)Combining with domain-specific contracts
Port.Repo handles generic persistence. Domain-specific operations (complex queries, business logic wrapped in persistence) should still use their own Port.Contract:
defmodule MyApp.Orders do
use HexPort.Contract
defport place_order(params :: map()) ::
{:ok, Order.t()} | {:error, term()}
end
defcomp checkout(params) do
order <- MyApp.Orders.place_order!(params)
audit_cs = AuditLog.changeset(%AuditLog{}, %{action: "checkout", order_id: order.id})
_ <- Repo.insert!(audit_cs)
order
endWhen to use Port.Repo vs a domain contract
| Situation | Use |
|---|---|
| Generic insert/update/delete/get | Port.Repo |
| Domain-specific queries | Domain Port.Contract |
| Business logic in persistence | Domain Port.Contract |
| Audit logs, simple CRUD | Port.Repo |
Transaction
Transactional semantics for computations: on normal completion the transaction commits (env state preserved); on explicit rollback or sentinel (Throw, Suspend, etc.) the transaction rolls back env state to pre-transaction values.
Transaction is orthogonal to persistence. A computation may need transactional env state rollback without any database involvement (e.g. rolling back Writer accumulations on error), or it may combine transactions with domain-specific persistence Ports.
Basic usage
comp do
result <- Transaction.transact(comp do
user <- UserRepo.create_user!(params)
order <- OrderRepo.create_order!(%{user_id: user.id, items: items})
{user, order}
end)
result
end
|> Transaction.Ecto.with_handler(MyApp.Repo)
|> Port.with_handler(%{UserRepo => UserRepo.Ecto, OrderRepo => OrderRepo.Ecto})
|> Comp.run!()Operations
Transaction.transact(comp)— wrap a computation in a transactionTransaction.rollback(reason)— explicitly roll back the current transaction
Explicit rollback
comp do
result <- Transaction.transact(comp do
_ <- Transaction.rollback(:validation_failed)
:never_reached
end)
result
end
|> Transaction.Noop.with_handler()
|> Comp.run!()
#=> {:rolled_back, :validation_failed}try_transact
A convenience that wraps the outcome in {:ok, result} or
{:rolled_back, reason} for easy pattern matching:
comp do
case Transaction.try_transact(inner_comp) do
{:ok, value} -> handle_success(value)
{:rolled_back, reason} -> handle_rollback(reason)
end
endThrows inside transactions
Throws automatically trigger rollback:
comp do
result <- Transaction.transact(comp do
_ <- Throw.throw(:something_went_wrong)
:never_reached
end)
result
end
|> Transaction.Ecto.with_handler(MyApp.Repo)
|> Throw.with_handler()
|> Comp.run!()
# Transaction is rolled back, throw propagatesNested transactions
Nested transact calls create savepoints (Ecto handler) or
independent rollback scopes (Noop handler):
comp do
result <- Transaction.transact(comp do
_ <- do_outer_work()
inner <- Transaction.transact(comp do
_ <- do_inner_work()
:inner_done
end)
{:outer_done, inner}
end)
result
endHandlers
Transaction.Ecto.with_handler(repo, opts) — Production handler.
Wraps the computation in Repo.transaction/2 with env state rollback
on failure. Nested transactions use savepoints.
Options:
:preserve_state_on_rollback— list of state keys to keep on rollback (e.g. metrics, error counters)
Transaction.Noop.with_handler(opts) — No-op handler. Env state
rollback without any database. Useful for testing code that needs
transactional semantics without a database connection.
Options:
:preserve_state_on_rollback— same as Ecto handler
Testing
For tests that don't need a real database, use the Noop handler:
comp do
result <- Transaction.transact(comp do
# Port calls are stubbed via Port.with_test_handler
user <- UserRepo.create_user!(params)
user
end)
result
end
|> Transaction.Noop.with_handler()
|> Port.with_test_handler(%{
UserRepo.__key__(:create_user, params) => %User{id: "test-id", name: "Alice"}
})
|> Comp.run!()Command
Dispatch mutation structs through a handler function. Command provides a clean separation between "what mutation to perform" and "how to perform it", routing through pattern matching.
Basic usage
Define command structs and a handler that routes them:
defmodule CreateTodo do
defstruct [:title, :priority]
end
defmodule DeleteTodo do
defstruct [:id]
end
defmodule MyCommandHandler do
use Skuld.Syntax
def handle(%CreateTodo{title: title, priority: priority}) do
comp do
id <- Fresh.fresh_uuid()
{:ok, %{id: id, title: title, priority: priority}}
end
end
def handle(%DeleteTodo{id: id}) do
comp do
{:ok, %{deleted: id}}
end
end
endExecute commands through the effect system:
comp do
{:ok, todo} <- Command.execute(%CreateTodo{title: "Buy milk", priority: :high})
todo
end
|> Command.with_handler(&MyCommandHandler.handle/1)
|> Fresh.with_uuid7_handler()
|> Comp.run!()
#=> %{id: "01945a3b-...", title: "Buy milk", priority: :high}Handler
Command.with_handler(handler_fn)The handler function receives a command struct and returns a computation. This means commands can use other effects internally - Fresh for IDs, Port contracts for persistence, EventAccumulator for domain events.
Why Command?
Command is a building block for the Decider pattern: decide (interpret commands) -> evolve (apply events to state) -> persist. Combined with EventAccumulator, you can express event-sourced domain logic as pure effectful code. See the Decider Pattern recipe for a full walkthrough.
EventAccumulator
Accumulate domain events during a computation. Built on Writer, it provides a focused API for collecting events that can be persisted or published after the computation completes.
Basic usage
comp do
_ <- EventAccumulator.emit(%{type: :user_created, id: 1})
_ <- EventAccumulator.emit(%{type: :email_sent, to: "user@example.com"})
:ok
end
|> EventAccumulator.with_handler(output: fn result, events -> {result, events} end)
|> Comp.run!()
#=> {:ok, [%{type: :user_created, id: 1}, %{type: :email_sent, to: "user@example.com"}]}Operations
EventAccumulator.emit(event)- add an event to the accumulator
Handler
EventAccumulator.with_handler(opts)Options:
:output-fn result, events -> transformed_result endto extract the accumulated events alongside the computation result
Combining with Command and Port
All three effects compose naturally for domain workflows:
defmodule PlaceOrderHandler do
use Skuld.Syntax
def handle(%PlaceOrder{user_id: uid, items: items}) do
comp do
order <- OrderRepo.create_order!(%{
user_id: uid, items: items, status: :placed
})
_ <- EventAccumulator.emit(%OrderPlaced{
order_id: order.id, user_id: uid, items: items
})
{:ok, order}
end
end
end
{result, events} =
comp do
{:ok, order} <- Command.execute(%PlaceOrder{
user_id: "u1", items: ["widget"]
})
order
end
|> Command.with_handler(&PlaceOrderHandler.handle/1)
|> EventAccumulator.with_handler(output: fn r, evts -> {r, evts} end)
|> Port.with_handler(%{OrderRepo => OrderRepo.Ecto})
|> Comp.run!()
# result is the order, events contains [%OrderPlaced{...}]
# Publish events to your event bus, persist to an event store, etc.< Concurrency (Familiar Patterns) | Up: Foundational Effects | Index | External Integration >