Effects: Persistence & Data
View SourceSkuld splits database interaction across several effects, each handling a distinct concern:
- DB — write operations (insert, update, upsert, delete, bulk variants) and transaction management through a single effect with swappable handlers (Ecto for production, Test for stubbing and call recording, Noop for transaction-only tests).
- Port — abstracts read queries (and any other blocking call
to external code) behind a dispatch layer with pluggable backends. Ecto's query
language is too rich to wrap in an effect, so reads are parameterised function calls
routed through Port, making them easy to stub in tests. Port.Contract adds typed
contracts via
defport, and Port.Provider enables the reverse direction — plain code calling into effectful implementations. - Query.Contract — a typed DSL for batchable queries, solving the N+1 problem.
deffetchdeclarations generate operation structs, typed caller functions, an Executor behaviour, and wiring helpers. Batch-reads suspend the current FiberPool fiber; when the run queue empties, the scheduler groups pending reads by query type and executes a single batched query per group, distributing results back to the waiting fibers. The programmer writes simple per-record fetch calls and gets automatic batching for free, with full type safety and LSP completion. - Command and EventAccumulator — building blocks for the Decider pattern. Command dispatches mutation structs through a handler that returns a computation; EventAccumulator accumulates domain events via Writer. Together they let you express decide -> evolve -> persist pipelines as pure effectful code.
DB
Unified database writes and transactions as effects (requires Ecto). The DB effect
provides single operations (insert, update, upsert, delete), bulk operations
(insert_all, update_all, upsert_all, delete_all), and transaction management through
a single effect signature with swappable handlers.
Single write operations
use Skuld.Syntax
alias Skuld.Comp
alias Skuld.Effects.DB
# Production: real Ecto operations
comp do
user <- DB.insert(User.changeset(%User{}, %{name: "Alice"}))
order <- DB.insert(Order.changeset(%Order{}, %{user_id: user.id}))
{user, order}
end
|> DB.Ecto.with_handler(MyApp.Repo)
|> Comp.run!()Operations accept changesets or ChangeEvent structs:
use Skuld.Syntax
alias Skuld.Effects.{DB, ChangeEvent}
comp do
user <- DB.insert(changeset) # from changeset
user <- DB.insert(ChangeEvent.insert(changeset)) # from ChangeEvent
user <- DB.update(User.changeset(user, %{name: "Bob"}))
user <- DB.upsert(changeset, conflict_target: :email)
{:ok, _} <- DB.delete(user)
user
endBulk operations
use Skuld.Syntax
alias Skuld.Effects.DB
comp do
{count, users} <- DB.insert_all(User, changesets, returning: true)
{count, nil} <- DB.update_all(User, changesets)
{count, nil} <- DB.delete_all(User, structs)
{count, users}
end
|> DB.Ecto.with_handler(MyApp.Repo)
|> Comp.run!()Transactions with automatic commit/rollback
use Skuld.Syntax
alias Skuld.Comp
alias Skuld.Effects.DB
# Normal completion - transaction commits
comp do
result <- DB.transact(comp do
user <- DB.insert(user_changeset)
order <- DB.insert(order_changeset)
{user, order}
end)
result
end
|> DB.Ecto.with_handler(MyApp.Repo)
|> Comp.run!()
# Explicit rollback
comp do
result <- DB.transact(comp do
_ <- DB.rollback(:validation_failed)
:never_reached
end)
result
end
|> DB.Noop.with_handler()
|> Comp.run!()
#=> {:rolled_back, :validation_failed}Throws inside a transaction automatically trigger rollback:
use Skuld.Syntax
alias Skuld.Comp
alias Skuld.Effects.{DB, Throw}
comp do
result <- DB.transact(comp do
_ <- Throw.throw(:something_went_wrong)
:never_reached
end)
result
end
|> DB.Ecto.with_handler(MyApp.Repo)
|> Throw.with_handler()
|> Comp.run!()
# Transaction is rolled back, throw propagatesHandlers
The same domain code works with different handlers:
use Skuld.Syntax
alias Skuld.Comp
alias Skuld.Effects.DB
create_order = fn user_id, items ->
comp do
result <- DB.transact(comp do
order = %{id: 1, user_id: user_id, items: items}
order
end)
result
end
end
# Production: real Ecto transactions
create_order.(123, [:item_a, :item_b])
|> DB.Ecto.with_handler(MyApp.Repo)
|> Comp.run!()
# Testing: no-op transactions (raises on writes)
create_order.(123, [:item_a, :item_b])
|> DB.Noop.with_handler()
|> Comp.run!()
# Testing: stub writes and record calls
{result, calls} =
comp do
user <- DB.insert(User.changeset(%User{}, %{name: "Alice"}))
user
end
|> DB.Test.with_handler(&DB.Test.default_handler/1)
|> Comp.run!()
#=> {%User{name: "Alice"}, [{:insert, %Ecto.Changeset{...}}]}The test handler records all operations via Writer and returns stubbed results:
use Skuld.Syntax
alias Skuld.Comp
alias Skuld.Effects.DB
# Custom handler for specific test scenarios
{result, calls} =
comp do
user <- DB.insert(changeset)
user
end
|> DB.Test.with_handler(fn
%DB.Insert{input: _cs} -> %User{id: "test-id", name: "Stubbed"}
%DB.Update{input: cs} -> Ecto.Changeset.apply_changes(cs)
end)
|> Comp.run!()
#=> {%User{id: "test-id", name: "Stubbed"}, [{:insert, %Ecto.Changeset{...}}]}Query.Contract
Typed batchable queries using FiberPool. deffetch declarations generate operation
structs, typed caller functions, an Executor behaviour, dispatch, and wiring helpers.
Multiple concurrent query calls are automatically batched, solving the N+1 problem:
defmodule MyApp.Queries.Users do
use Skuld.Query.Contract
deffetch get_user(id :: String.t()) :: User.t() | nil
deffetch get_users_by_org(org_id :: String.t()) :: [User.t()]
endThe programmer writes simple per-record query calls. When multiple fibers make the same type of query concurrently, they are automatically batched into a single executor invocation:
FiberPool.map(["1", "2", "3"], &MyApp.Queries.Users.get_user/1)
|> MyApp.Queries.Users.with_executor(MyApp.Queries.Users.EctoExecutor)
|> FiberPool.with_handler()
|> Comp.run!()
# All 3 get_user calls batched into a single executor invocationQueries can be wrapped with Skuld.Query.Cache for automatic cross-batch result
caching and within-batch request deduplication:
alias Skuld.Query.Cache, as: QueryCache
FiberPool.map(["1", "2", "3"], &MyApp.Queries.Users.get_user/1)
|> QueryCache.with_executor(MyApp.Queries.Users, MyApp.Queries.Users.EctoExecutor)
|> FiberPool.with_handler()
|> Comp.run!()See Query documentation for the full API:
the query macro, deffetch contract definition, executor implementation,
wiring, bulk wiring, caching, bang variants, introspection, and testing patterns.
See also Concurrency effects - Brook I/O Batching for automatic batching with nested reads across concurrent fibers.
Port
Port abstracts read queries and other blocking calls behind a dispatch layer with
pluggable backends, making them easy to stub in tests. Port.Contract adds typed
contracts via defport declarations with Consumer/Provider behaviour generation,
and Port.Provider enables the reverse direction — plain code calling into effectful
implementations.
See Port documentation for the full API: low-level
Port.request, typed Port.Contract, provider-side Port.Provider, handler
types, and testing patterns.
Command
Dispatch commands (mutations) through a unified handler:
use Skuld.Syntax
alias Skuld.Comp
alias Skuld.Effects.{Command, Fresh}
# Define command structs
defmodule CreateTodo do
defstruct [:title, :priority]
end
defmodule DeleteTodo do
defstruct [:id]
end
# Define a command handler that routes via pattern matching
defmodule MyCommandHandler do
use Skuld.Syntax
def handle(%CreateTodo{title: title, priority: priority}) do
comp do
id <- Fresh.fresh_uuid()
{:ok, %{id: id, title: title, priority: priority}}
end
end
def handle(%DeleteTodo{id: id}) do
comp do
{:ok, %{deleted: id}}
end
end
end
# Execute commands through the effect system
comp do
{:ok, todo} <- Command.execute(%CreateTodo{title: "Buy milk", priority: :high})
todo
end
|> Command.with_handler(&MyCommandHandler.handle/1)
|> Fresh.with_uuid7_handler()
|> Comp.run!()
#=> %{id: "01945a3b-...", title: "Buy milk", priority: :high}The handler function returns a computation, so commands can use other effects (Fresh, DB, EventAccumulator, etc.) internally. This enables a clean separation between command dispatch and command implementation.
EventAccumulator
Accumulate domain events during computation (built on Writer):
use Skuld.Syntax
alias Skuld.Comp
alias Skuld.Effects.EventAccumulator
comp do
_ <- EventAccumulator.emit(%{type: :user_created, id: 1})
_ <- EventAccumulator.emit(%{type: :email_sent, to: "user@example.com"})
:ok
end
|> EventAccumulator.with_handler(output: fn result, events -> {result, events} end)
|> Comp.run!()
#=> {:ok, [%{type: :user_created, id: 1}, %{type: :email_sent, to: "user@example.com"}]}