Skuld.Effects.FiberPool (skuld v0.23.0)
View SourceEffect for cooperative fiber-based concurrency.
FiberPool provides lightweight cooperative concurrency within a single process. Fibers are scheduled cooperatively - they run until they await, complete, or error.
Basic Usage
comp do
# Run work as a fiber (cooperative, same process)
h1 <- FiberPool.fiber(expensive_computation())
h2 <- FiberPool.fiber(another_computation())
# Do other work while fibers run...
# Await results (raises on error)
r1 <- FiberPool.await!(h1)
r2 <- FiberPool.await!(h2)
{r1, r2}
end
|> FiberPool.with_handler()
|> Comp.run()Structured Concurrency
Use scope/1 to ensure all spawned fibers complete before exiting:
comp do
FiberPool.scope(comp do
h1 <- FiberPool.fiber(work1())
h2 <- FiberPool.fiber(work2())
# Both h1 and h2 will complete (or be cancelled) before scope exits
FiberPool.await_all!([h1, h2])
end)
endComparison with Async
FiberPool replaces the Async effect with a simpler, more focused design:
- Fibers only (no Tasks in this module - see Task integration milestone)
- Simpler state management
- Designed for I/O batching integration
Summary
Functions
Applicative ap — run a function-producing computation and a value-producing
computation concurrently as FiberPool fibers, then apply the function to
the value.
Await a single fiber's result.
Await a single fiber's result, raising on error.
Await all fibers' results.
Await all fibers' results, raising on any error.
Await any fiber's result.
Await any fiber's result, raising on error.
Await a fiber's result with single-consumer semantics.
Cancel a fiber.
Run a computation as a fiber (cooperative, same process).
Spawn each computation as a fiber, returning all handles.
Run a list of computations concurrently as fibers, returning all results in order.
Map a function over items, running each result computation as a fiber, and return all results in order.
Create a structured concurrency scope.
Run a thunk as a BEAM Task (parallel, separate process).
Install the FiberPool handler for a computation.
Install a Task.Supervisor for BEAM task support.
Functions
@spec ap(Skuld.Comp.Types.computation(), Skuld.Comp.Types.computation()) :: Skuld.Comp.Types.computation()
Applicative ap — run a function-producing computation and a value-producing
computation concurrently as FiberPool fibers, then apply the function to
the value.
This is the standard applicative functor <*> operation. Both computations
are spawned as cooperative fibers within the same FiberPool, so their effects
(including data fetches) land in the same batch round, enabling implicit
concurrency.
How it achieves concurrency
ap runs exactly two computations concurrently: one that produces a
function, and one that produces a value. Both are spawned as fibers and
awaited together, so their data fetches land in the same batch round.
When both complete, the function is applied to the value.
To run more than two operations concurrently, ap is applied repeatedly
(like cons building a list). Each application adds one more concurrent
computation by pairing it with a function that accumulates results:
# Three concurrent fetches via repeated ap:
ap(
ap(
Comp.map(fetch(:x), fn x -> fn y -> [y, x] end end),
fetch(:y)
),
fetch(:z)
)
|> Comp.map(fn [z, y, x] -> {x, y, z} end)Each ap spawns two fibers — the accumulated computation so far (which
returns a function) and the next value computation. The function captures
previous results in a closure and conses the new value onto them. This is
the standard applicative pattern from Haskell's <*>, where liftA2,
liftA3, etc. are built by chaining <*> with fmap.
In practice, the query macro handles this desugaring automatically —
users rarely need to call ap directly.
Requires a FiberPool handler to be installed.
Example
comp_f = Comp.pure(fn x -> x * 2 end)
comp_a = Comp.pure(21)
result = FiberPool.ap(comp_f, comp_a)
# result is a computation that returns 42
@spec await(Skuld.Fiber.Handle.t()) :: Skuld.Comp.Types.computation()
Await a single fiber's result.
Suspends the current fiber until the target fiber completes.
Returns {:ok, value} on success or {:error, reason} on failure.
Example
comp do
h <- FiberPool.fiber(some_work())
result <- FiberPool.await(h)
case result do
{:ok, value} -> # use value
{:error, reason} -> # handle error
end
end
@spec await!(Skuld.Fiber.Handle.t()) :: Skuld.Comp.Types.computation()
Await a single fiber's result, raising on error.
Suspends the current fiber until the target fiber completes. Returns the result value directly, or raises if the fiber errored.
Example
comp do
h <- FiberPool.fiber(some_work())
value <- FiberPool.await!(h) # raises on error
# use value
end
@spec await_all([Skuld.Fiber.Handle.t()]) :: Skuld.Comp.Types.computation()
Await all fibers' results.
Suspends until all fibers complete. Returns results in the same order
as the input handles, each as {:ok, value} or {:error, reason}.
@spec await_all!([Skuld.Fiber.Handle.t()]) :: Skuld.Comp.Types.computation()
Await all fibers' results, raising on any error.
Suspends until all fibers complete. Returns result values in the same order as the input handles. Raises if any fiber errored.
@spec await_any([Skuld.Fiber.Handle.t()]) :: Skuld.Comp.Types.computation()
Await any fiber's result.
Suspends until at least one fiber completes. Returns {handle, result}
where result is {:ok, value} or {:error, reason}.
@spec await_any!([Skuld.Fiber.Handle.t()]) :: Skuld.Comp.Types.computation()
Await any fiber's result, raising on error.
Suspends until at least one fiber completes. Returns {handle, value}
for the first fiber to complete. Raises if the fiber errored.
@spec await_consume(Skuld.Fiber.Handle.t()) :: Skuld.Comp.Types.computation()
Await a fiber's result with single-consumer semantics.
Like await/1, but removes the result from the completed map after retrieval.
Use this when you know the fiber will only be awaited once, to enable
garbage collection of the result.
This is used internally by Channel.take_async for memory-efficient streaming.
@spec cancel(Skuld.Fiber.Handle.t()) :: Skuld.Comp.Types.computation()
Cancel a fiber.
Marks the fiber for cancellation. If the fiber is suspended, it will be woken with an error. If running, it will be cancelled at the next suspension point.
@spec fiber( Skuld.Comp.Types.computation(), keyword() ) :: Skuld.Comp.Types.computation()
Run a computation as a fiber (cooperative, same process).
Returns a handle that can be used to await the result.
Options
:priority- Fiber priority (future use)
@spec fiber_all([Skuld.Comp.Types.computation()]) :: Skuld.Comp.Types.computation()
Spawn each computation as a fiber, returning all handles.
The handles can be awaited individually or with await_all/1 / await_all!/1.
Example
comp do
handles <- FiberPool.fiber_all([fetch(:x), fetch(:y), fetch(:z)])
FiberPool.await_all!(handles)
end
@spec fiber_await_all([Skuld.Comp.Types.computation()]) :: Skuld.Comp.Types.computation()
Run a list of computations concurrently as fibers, returning all results in order.
Each computation is spawned as a fiber, then all are awaited with await_all!/1.
Raises if any fiber fails.
A single-element list is optimised to skip fiber overhead.
This is the primitive used by the query macro for independent binding groups.
Example
FiberPool.fiber_await_all([fetch(:x), fetch(:y), fetch(:z)])
# returns a computation producing [x_result, y_result, z_result]
@spec map([a], (a -> Skuld.Comp.Types.computation())) :: Skuld.Comp.Types.computation() when a: term()
Map a function over items, running each result computation as a fiber, and return all results in order.
This is a convenience combining Enum.map/2, fiber_all/1, and await_all!/1.
Raises if any fiber fails.
Example
FiberPool.map([1, 2, 3], &Queries.get_user/1)
# returns a computation producing [user1, user2, user3]
@spec scope( Skuld.Comp.Types.computation(), keyword() ) :: Skuld.Comp.Types.computation()
Create a structured concurrency scope.
All fibers submitted within the scope will be awaited before the scope returns. If the scope body completes normally, all fibers are awaited and their results discarded (the scope returns the body's result). If the scope body errors, all fibers are cancelled.
Options
:on_exit- Optional callbackfn result, handles -> computation()called before awaiting fibers. Can be used for custom cleanup.
Example
FiberPool.scope(comp do
h1 <- FiberPool.fiber(work1())
h2 <- FiberPool.fiber(work2())
# Both h1 and h2 will complete before scope exits
FiberPool.await!(h1)
end)
@spec task( (-> term()), keyword() ) :: Skuld.Comp.Types.computation()
Run a thunk as a BEAM Task (parallel, separate process).
The thunk runs in a separate BEAM process, allowing true parallelism for CPU-bound work. Returns a handle that can be awaited just like fiber handles.
Important: The thunk is a zero-arity function, not a computation. Effects do not work inside tasks because they run in a different process. Extract any values you need from Reader/State before constructing the thunk.
Why thunks, not computations?
Tasks run in a separate BEAM process. While an effectful computation could be shipped to the other process and executed there, the modified environment it returns would then need to be integrated back into the calling process's environment. There is no general way to do this — environment changes from effects (State mutations, Writer accumulations, handler installations, etc.) have no guarantee of commutativity, so a task's environment modifications cannot be safely merged with whatever the calling process has done in the meantime.
Restricting tasks to plain thunks forces the caller to explicitly extract any needed values from the environment before spawning the task, and to explicitly handle the task's return value after awaiting it. This makes the cross-process boundary visible and avoids the need for an unsound automatic environment merge.
Options
:timeout- Task timeout in milliseconds (default: 5000)
Example
comp do
# Extract any needed values from effects first
config <- Reader.ask(:config)
# CPU-bound work runs in parallel - thunk captures what it needs
h1 <- FiberPool.task(fn -> expensive_cpu_work(config) end)
h2 <- FiberPool.task(fn -> another_cpu_work(config) end)
# Await results
r1 <- FiberPool.await!(h1)
r2 <- FiberPool.await!(h2)
{r1, r2}
end
@spec with_handler( Skuld.Comp.Types.computation(), keyword() ) :: Skuld.Comp.Types.computation()
Install the FiberPool handler for a computation.
The handler collects fiber submissions and await operations.
Use run/1 or run!/1 to execute the computation with full
fiber scheduling.
@spec with_task_supervisor( Skuld.Comp.Types.computation(), keyword() ) :: Skuld.Comp.Types.computation()
Install a Task.Supervisor for BEAM task support.
This starts a Task.Supervisor process before the computation runs and
stops it after completion. Required for FiberPool.task/2 — calling
task/2 without this will raise an error.
Example
comp do
h <- FiberPool.task(fn -> expensive_cpu_work() end)
FiberPool.await!(h)
end
|> FiberPool.with_handler()
|> FiberPool.with_task_supervisor()
|> Comp.run!()Options
:supervisor- An existingTask.Supervisorpid to use instead of starting a new one. The caller is responsible for its lifecycle.