Query
View SourceHeavily inspired by Haxl.
Skuld's query system solves the N+1 problem with automatic batching and
concurrency. You write simple per-record fetch calls; the runtime batches
concurrent calls of the same type into a single executor invocation, and the
query macro automatically runs independent fetches concurrently.
The system has three layers:
query doblock — compose fetch operations with automatic concurrencydeffetchcontracts — declare typed, batchable fetch operationsQuery.Cache— cross-batch caching and within-batch deduplication
The N+1 Problem
The N+1 problem occurs when code fetches a collection of N items and then makes a separate query for each one — 1 query for the list, then N queries for related data. This happens naturally whenever per-record fetch logic is spread across helper functions, and the caller iterates a collection.
Example: CSV Import Processing
Consider importing rows from a staging table, where processing each row needs to look up related records:
# WITHOUT Skuld — classic N+1
def process_import(staging_rows) do
Enum.map(staging_rows, fn row ->
# Each call hits the DB individually — N queries for users, N for accounts
user = Repo.get_by(User, email: row.email)
account = Repo.get_by(Account, external_id: row.account_ref)
process_row(row, user, account)
end)
endWith 1,000 staging rows, this makes 2,000 individual queries. With Skuld's query system, you write the same per-record logic, but the runtime batches automatically:
# WITH Skuld — per-record calls, automatic batching
defmodule Import.Queries do
use Skuld.Query.Contract
deffetch get_user_by_email(email :: String.t()) :: User.t() | nil
deffetch get_account_by_ref(ref :: String.t()) :: Account.t() | nil
end
# Processing a single row — reads like normal per-record code.
# The two fetches are independent, so `query` runs them concurrently.
def process_row(row) do
query do
user <- Import.Queries.get_user_by_email(row.email)
account <- Import.Queries.get_account_by_ref(row.account_ref)
do_process(row, user, account)
end
end
# Process all rows — FiberPool.map spawns concurrent fibers,
# and all get_user_by_email calls batch into one executor invocation,
# all get_account_by_ref calls batch into another
FiberPool.map(staging_rows, &process_row/1)
|> Import.Queries.with_executor(Import.Queries.EctoExecutor)
|> FiberPool.with_handler()
|> Comp.run!()
# Result: 2 batched queries instead of 2,000 individual onesThe per-record code is simple and composable — process_row doesn't know or
care that it runs concurrently alongside hundreds of other rows. The batching
happens at the runtime level when the FiberPool scheduler collects suspended
fibers and groups their batch requests by type.
This works even when the N+1 is buried deep in the call stack — a helper
function three levels down that calls get_user_by_email still participates in
the same batch, because batching is a property of the runtime, not the call
site.
Streaming: Constant-Memory Batching
The FiberPool.map example above loads all staging rows into memory at once.
For large imports — millions of rows, or an unbounded stream of incoming
data — you can use Brook (Skuld's streaming
API) for constant-memory batched processing. Brook uses bounded channel
buffers for backpressure: only a fixed window of items is in flight at any
time, while batching still works across all concurrent fibers in that window.
# Constant-memory streaming import — batching + backpressure
comp do
# Stream rows from the staging table (or any source)
source <- Brook.from_function(fn ->
case StagingTable.next_batch() do
{:ok, rows} -> {:items, rows}
:done -> :done
end
end, chunk_size: 50)
# Process rows with bounded concurrency — buffer size controls
# how many chunks are in flight (and thus how many fibers batch together)
Brook.map(source, &process_row/1, concurrency: 10, buffer: 5)
|> Brook.run(fn result -> persist_result(result) end)
end
|> Import.Queries.with_executor(Import.Queries.EctoExecutor)
|> Channel.with_handler()
|> FiberPool.with_handler()
|> Comp.run!()With concurrency: 10 and chunk_size: 50, at most ~500 rows are being
processed at any time. The deffetch calls from those ~500 rows batch into
just a handful of executor invocations per round, regardless of whether the
total import is 1,000 rows or 10 million. Memory stays bounded, and the
batching efficiency is the same as the in-memory version.
See the Brook documentation for the full
streaming API, including from_enum, from_function, map, filter, each,
and backpressure configuration.
Example: GraphQL Resolvers
GraphQL is the canonical N+1 scenario. A query like { posts { author { name } } }
resolves each post's author field individually — if there are 50 posts, that's
50 author lookups:
# WITHOUT batching — N+1 in every list-of-objects resolver
object :post do
field :author, :user do
resolve fn post, _, _ ->
{:ok, Repo.get(User, post.author_id)} # called once per post
end
end
endWith Skuld's query system, per-field resolvers stay simple while batching happens automatically across all concurrent fibers in a resolution round:
# Skuld query contracts for the data layer
defmodule Blog.Queries do
use Skuld.Query.Contract
deffetch get_user(id :: String.t()) :: User.t() | nil
deffetch get_comments(post_id :: String.t()) :: [Comment.t()]
end
# Per-field resolver logic — still per-record, but batches automatically
def resolve_author(post) do
Blog.Queries.get_user(post.author_id)
endWhen 50 posts resolve their author field concurrently, all 50
get_user calls are batched into a single executor invocation that
receives all 50 author IDs at once.
(Absinthe already has its own batch-scheduling infrastructure via
Absinthe.Middleware.Dataloader, so Skuld's query system wouldn't add much
there. But if you were building your own GraphQL execution engine, Skuld's
fiber-based batching would be a good foundation.)
The query Macro
The query macro (Skuld.Query.QueryBlock) provides the lowest-boilerplate way to
compose batchable fetch computations. It analyses variable dependencies at compile
time and automatically groups independent bindings into concurrent fiber batches.
use Skuld.Syntax
query do
user <- Users.get_user(id)
recent <- Orders.get_recent()
orders <- Orders.get_by_user(user.id)
{user, recent, orders}
endThe right-hand side of each <- binding can be any Skuld computation — a
Comp.pure(...), a comp do ... end block, a State.get(), or any other
effectful expression. However, the caller functions generated by deffetch
contracts (described below) are specifically designed to work with the query
batching system: when multiple fibers make the same type of deffetch call
concurrently, the runtime automatically batches them into a single executor
invocation. The deffetch contracts also provide compile-time benefits — typed
@specs for LSP support, generated @callbacks for executor implementations,
and compiler checks on the executor behaviour.
Here get_user and get_recent are independent (neither references the other's
result), so they run concurrently in the same fiber batch. get_by_user depends on
user, so it runs in a subsequent batch after the first completes.
Syntax
var <- computation— effectful binding (auto-batched if independent)var = expression— pure binding (participates in dependency analysis)- Last expression — the block's return value (auto-lifted to
Comp.pure)
How It Works
The macro performs compile-time dependency analysis:
- Parse the block into
<-/=bindings plus a final expression - For each binding, compute which bound variables from earlier bindings are referenced in its right-hand side
- Group bindings into "rounds" using topological sort — a binding goes in the earliest round where all its dependencies are satisfied
- Single-binding rounds use
FiberPool.fiber+await!; multi-binding rounds useFiberPool.fiber_all+await_all!
The above example compiles to roughly:
FiberPool.fiber_all([Users.get_user(id), Orders.get_recent()])
|> Comp.bind(&FiberPool.await_all!/1)
|> Comp.bind(fn [user, recent] ->
FiberPool.fiber(Orders.get_by_user(user.id))
|> Comp.bind(&FiberPool.await!/1)
|> Comp.bind(fn orders ->
Comp.pure({user, recent, orders})
end)
end)Or equivalently, using comp with the fiber boilerplate explicit:
comp do
handles <- FiberPool.fiber_all([Users.get_user(id), Orders.get_recent()])
[user, recent] <- FiberPool.await_all!(handles)
handle <- FiberPool.fiber(Orders.get_by_user(user.id))
orders <- FiberPool.await!(handle)
{user, recent, orders}
endDifferences from comp
Both comp and query use do-block syntax with <- bindings, but:
compis purely sequential — each binding waits for the previous onequeryanalyses dependencies and runs independent bindings concurrently
Use comp for sequential effect chains (state updates, error handling). Use
query when composing data fetches that may be independent.
Requirements
query requires a FiberPool handler to be installed, since it spawns fibers
for concurrency.
Defining Fetch Operations with deffetch
Query.Contract (Skuld.Query.Contract) is a typed DSL for declaring batchable
fetch operations. It generates the operation structs, caller functions, executor
behaviour, and wiring that the query macro (and FiberPool) use for batching.
Defining a Contract
defmodule MyApp.Queries.Users do
use Skuld.Query.Contract
deffetch get_user(id :: String.t()) :: User.t() | nil
deffetch get_users_by_org(org_id :: String.t()) :: [User.t()]
deffetch get_user_count(org_id :: String.t()) :: non_neg_integer()
endEach deffetch declaration generates:
- Operation struct — e.g.
MyApp.Queries.Users.GetUserwith typed fields Caller function — e.g.
get_user/1returningcomputation(User.t() | nil), which suspends the current fiber for batched execution- Executor callback — on
MyApp.Queries.Users.Executor, receiving a list of{ref, op_struct}tuples and returningcomputation(%{ref => result}) - Dispatch function —
__dispatch__/3routes from batch key to executor callback - Wiring function —
with_executor/2installs an executor for all fetches - Bang variant — when the return type contains
{:ok, T}(see below) - Introspection —
__query_operations__/0returns metadata
Implementing an Executor
An executor implements the generated behaviour with one callback per fetch:
defmodule MyApp.Queries.Users.EctoExecutor do
use Skuld.Syntax
@behaviour MyApp.Queries.Users.Executor
alias Skuld.Effects.Reader
alias MyApp.Queries.Users.{GetUser, GetUsersByOrg, GetUserCount}
@impl true
defcomp get_user(ops) do
ids = Enum.map(ops, fn {_ref, %GetUser{id: id}} -> id end) |> Enum.uniq()
repo <- Reader.ask(:repo)
results = repo.all(from u in User, where: u.id in ^ids)
by_id = Map.new(results, &{&1.id, &1})
Map.new(ops, fn {ref, %GetUser{id: id}} ->
{ref, Map.get(by_id, id)}
end)
end
@impl true
defcomp get_users_by_org(ops) do
org_ids = Enum.map(ops, fn {_ref, %GetUsersByOrg{org_id: oid}} -> oid end) |> Enum.uniq()
repo <- Reader.ask(:repo)
results = repo.all(from u in User, where: u.org_id in ^org_ids)
grouped = Enum.group_by(results, & &1.org_id)
Map.new(ops, fn {ref, %GetUsersByOrg{org_id: oid}} ->
{ref, Map.get(grouped, oid, [])}
end)
end
@impl true
defcomp get_user_count(ops) do
org_ids = Enum.map(ops, fn {_ref, %GetUserCount{org_id: oid}} -> oid end) |> Enum.uniq()
repo <- Reader.ask(:repo)
counts =
repo.all(from u in User, where: u.org_id in ^org_ids,
group_by: u.org_id, select: {u.org_id, count(u.id)})
|> Map.new()
Map.new(ops, fn {ref, %GetUserCount{org_id: oid}} ->
{ref, Map.get(counts, oid, 0)}
end)
end
endThe pattern is always:
- Extract unique parameter values from the batched ops
- Execute a single query covering all values
- Map results back to
%{request_id => result}
Since executor callbacks return computations, they can use any Skuld effect (Reader, State, DB, Port, etc.).
Wiring
Install an executor for a contract using with_executor/2:
query do
user <- Users.get_user("1")
recent <- Orders.get_recent()
{user, recent}
end
|> Users.with_executor(Users.EctoExecutor)
|> Orders.with_executor(Orders.EctoExecutor)
|> FiberPool.with_handler()
|> Comp.run!()with_executor/2 registers the executor for all fetches in the contract. Each
fetch type batches independently — get_user calls batch together,
get_users_by_org calls batch together, but they don't mix.
Bulk Wiring
For applications with multiple contracts, use Skuld.Query.Contract.with_executors/2:
my_query_comp
|> Skuld.Query.Contract.with_executors([
{MyApp.Queries.Users, MyApp.Queries.Users.EctoExecutor},
{MyApp.Queries.Orders, MyApp.Queries.Orders.EctoExecutor}
])Also accepts a map:
my_query_comp
|> Skuld.Query.Contract.with_executors(%{
MyApp.Queries.Users => MyApp.Queries.Users.EctoExecutor,
MyApp.Queries.Orders => MyApp.Queries.Orders.EctoExecutor
})Bang Variants
Same rules as Port.Contract:
- Auto-detect (default): If the return type contains
{:ok, T}, a bang variant is generated that unwraps{:ok, value}or dispatchesThrowon{:error, reason}. bang: true: Force standard unwrapping.bang: false: Suppress bang generation.bang: unwrap_fn: Custom unwrap function.
defmodule MyApp.Queries.Accounts do
use Skuld.Query.Contract
# Auto-detect: {:ok, T} in return type -> bang generated
deffetch find_account(id :: String.t()) :: {:ok, Account.t()} | {:error, term()}
# No {:ok, T} pattern -> no bang
deffetch get_account(id :: String.t()) :: Account.t() | nil
# Force bang
deffetch lookup_account(id :: String.t()) :: Account.t() | nil, bang: true
# Suppress bang
deffetch search_accounts(q :: String.t()) :: {:ok, [Account.t()]} | {:error, term()},
bang: false
# Custom unwrap
deffetch fetch_account(id :: String.t()) :: map(),
bang: fn result -> {:ok, result} end
endIntrospection
Every contract module provides __query_operations__/0:
MyApp.Queries.Users.__query_operations__()
#=> [
# %{name: :get_user, params: [:id], param_types: [...], return_type: ..., arity: 1, cacheable: true},
# %{name: :get_users_by_org, params: [:org_id], ..., cacheable: true},
# %{name: :get_user_count, params: [:org_id], ..., cacheable: true}
# ]Caching
Skuld.Query.Cache provides a composable caching layer that sits between your
computation and the batch executors:
- Cross-batch result caching — identical queries across batch rounds return cached results without re-executing
- Within-batch request deduplication — identical queries in the same batch round are sent to the executor only once, with the result fanned out to all requesting fibers
Wiring with Cache
Use QueryCache.with_executor/3 or QueryCache.with_executors/2 instead of
Contract.with_executor/2:
alias Skuld.Query.Cache, as: QueryCache
# Single contract
my_comp
|> QueryCache.with_executor(MyApp.Queries.Users, MyApp.Queries.Users.EctoExecutor)
|> FiberPool.with_handler()
|> Comp.run()
# Multiple contracts (shared cache scope)
my_comp
|> QueryCache.with_executors([
{MyApp.Queries.Users, MyApp.Queries.Users.EctoExecutor},
{MyApp.Queries.Orders, MyApp.Queries.Orders.EctoExecutor}
])
|> FiberPool.with_handler()
|> Comp.run()Cache Scope and Lifetime
The cache is scoped per computation run. It's initialised as an empty map when
with_executors sets up the scope, and cleaned up on scope exit. No TTL, no
eviction — the cache lives exactly as long as the computation.
This matches Haxl's per-Env cache semantics: within a single request/computation,
you get automatic deduplication and caching, with no stale data concerns.
Cache Keys
Cache keys are {batch_key, op_struct} where:
batch_keyis{ContractModule, :query_name}(e.g.,{Users, :get_user})op_structis the operation struct (e.g.,%Users.GetUser{id: "123"})
Structural equality on Elixir structs provides correct comparison. Two queries with the same parameters share a cache entry.
Per-Query Opt-Out
Mark individual queries as non-cacheable with cache: false:
defmodule MyApp.Queries.Users do
use Skuld.Query.Contract
deffetch get_user(id :: String.t()) :: User.t() | nil
deffetch get_random_user() :: User.t(), cache: false
endNon-cacheable queries bypass the cache entirely — they always go to the executor.
Error Handling
Executor failures are not cached. When an executor returns a Throw/error:
- The error propagates normally to requesting fibers
- Nothing is stored in the cache
- Subsequent requests for the same query go to the executor again
How Batching Works
Under the hood, the query system uses FiberPool's batch suspension mechanism:
- Each caller function (e.g.,
get_user/1) creates anInternalSuspend.batchwith a batch key of{ContractModule, :query_name}and an operation struct - The FiberPool scheduler collects batch-suspended fibers
- When the run queue empties, the scheduler groups suspensions by batch key
- For each group, the registered executor callback runs with all ops
- Results are distributed back to the waiting fibers via the
request_idmap
The programmer writes simple per-record fetch calls. Batching happens automatically when multiple fibers make the same type of fetch concurrently.
Manual FiberPool Composition
For cases where the query macro isn't suitable, you can compose fetches
manually using FiberPool primitives:
# Using FiberPool.map for collections
FiberPool.map(["1", "2", "3"], &Users.get_user/1)
|> Users.with_executor(Users.EctoExecutor)
|> FiberPool.with_handler()
|> Comp.run!()
# All 3 get_user calls batched into a single executor invocation
# Using fiber/await for individual control
comp do
h1 <- FiberPool.fiber(Users.get_user("1"))
h2 <- FiberPool.fiber(Users.get_user("2"))
h3 <- FiberPool.fiber(Users.get_user("3"))
FiberPool.await_all!([h1, h2, h3])
endTesting
For tests, implement a stub executor:
defmodule MyApp.Queries.Users.TestExecutor do
use Skuld.Syntax
@behaviour MyApp.Queries.Users.Executor
alias MyApp.Queries.Users.{GetUser, GetUsersByOrg}
@impl true
defcomp get_user(ops) do
Map.new(ops, fn {ref, %GetUser{id: id}} ->
{ref, %User{id: id, name: "Test User #{id}"}}
end)
end
@impl true
defcomp get_users_by_org(ops) do
Map.new(ops, fn {ref, %GetUsersByOrg{org_id: _}} ->
{ref, []}
end)
end
# ...
endOr use inline anonymous executors for one-off tests (register directly with
BatchExecutor.with_executor/3):
alias Skuld.Fiber.FiberPool.BatchExecutor
query do
user <- Users.get_user("1")
user
end
|> BatchExecutor.with_executor(
{Users, :get_user},
fn ops ->
Comp.pure(Map.new(ops, fn {ref, _op} -> {ref, %User{id: "1", name: "Stubbed"}} end))
end
)
|> FiberPool.with_handler()
|> Comp.run!()Comparison with Port.Contract
| Aspect | Port.Contract | Query.Contract |
|---|---|---|
| Purpose | Blocking calls to external code | Batchable queries across fibers |
| Execution | One call at a time | Multiple calls batched together |
| Requires FiberPool | No | Yes |
| Executor receives | Single call args | List of {ref, op_struct} |
| Executor returns | Single result | %{ref => result} map |
| Macro | defport | deffetch |
| Bang variants | Same rules | Same rules |
| Handler installation | Port.with_handler/2 | Contract.with_executor/2 |
| Composition | comp do (sequential) | query do (auto-concurrent) |