Skuld.Effects.FiberPool (skuld v0.23.0)

View Source

Effect for cooperative fiber-based concurrency.

FiberPool provides lightweight cooperative concurrency within a single process. Fibers are scheduled cooperatively - they run until they await, complete, or error.

Basic Usage

comp do
  # Run work as a fiber (cooperative, same process)
  h1 <- FiberPool.fiber(expensive_computation())
  h2 <- FiberPool.fiber(another_computation())

  # Do other work while fibers run...

  # Await results (raises on error)
  r1 <- FiberPool.await!(h1)
  r2 <- FiberPool.await!(h2)

  {r1, r2}
end
|> FiberPool.with_handler()
|> Comp.run()

Structured Concurrency

Use scope/1 to ensure all spawned fibers complete before exiting:

comp do
  FiberPool.scope(comp do
    h1 <- FiberPool.fiber(work1())
    h2 <- FiberPool.fiber(work2())
    # Both h1 and h2 will complete (or be cancelled) before scope exits
    FiberPool.await_all!([h1, h2])
  end)
end

Comparison with Async

FiberPool replaces the Async effect with a simpler, more focused design:

  • Fibers only (no Tasks in this module - see Task integration milestone)
  • Simpler state management
  • Designed for I/O batching integration

Summary

Functions

Applicative ap — run a function-producing computation and a value-producing computation concurrently as FiberPool fibers, then apply the function to the value.

Await a single fiber's result.

Await a single fiber's result, raising on error.

Await all fibers' results.

Await all fibers' results, raising on any error.

Await any fiber's result.

Await any fiber's result, raising on error.

Await a fiber's result with single-consumer semantics.

Cancel a fiber.

Run a computation as a fiber (cooperative, same process).

Spawn each computation as a fiber, returning all handles.

Run a list of computations concurrently as fibers, returning all results in order.

Map a function over items, running each result computation as a fiber, and return all results in order.

Create a structured concurrency scope.

Run a thunk as a BEAM Task (parallel, separate process).

Install the FiberPool handler for a computation.

Install a Task.Supervisor for BEAM task support.

Functions

ap(comp_f, comp_a)

Applicative ap — run a function-producing computation and a value-producing computation concurrently as FiberPool fibers, then apply the function to the value.

This is the standard applicative functor <*> operation. Both computations are spawned as cooperative fibers within the same FiberPool, so their effects (including data fetches) land in the same batch round, enabling implicit concurrency.

How it achieves concurrency

ap runs exactly two computations concurrently: one that produces a function, and one that produces a value. Both are spawned as fibers and awaited together, so their data fetches land in the same batch round. When both complete, the function is applied to the value.

To run more than two operations concurrently, ap is applied repeatedly (like cons building a list). Each application adds one more concurrent computation by pairing it with a function that accumulates results:

# Three concurrent fetches via repeated ap:
ap(
  ap(
    Comp.map(fetch(:x), fn x -> fn y -> [y, x] end end),
    fetch(:y)
  ),
  fetch(:z)
)
|> Comp.map(fn [z, y, x] -> {x, y, z} end)

Each ap spawns two fibers — the accumulated computation so far (which returns a function) and the next value computation. The function captures previous results in a closure and conses the new value onto them. This is the standard applicative pattern from Haskell's <*>, where liftA2, liftA3, etc. are built by chaining <*> with fmap.

In practice, the query macro handles this desugaring automatically — users rarely need to call ap directly.

Requires a FiberPool handler to be installed.

Example

comp_f = Comp.pure(fn x -> x * 2 end)
comp_a = Comp.pure(21)
result = FiberPool.ap(comp_f, comp_a)
# result is a computation that returns 42

await(handle)

@spec await(Skuld.Fiber.Handle.t()) :: Skuld.Comp.Types.computation()

Await a single fiber's result.

Suspends the current fiber until the target fiber completes. Returns {:ok, value} on success or {:error, reason} on failure.

Example

comp do
  h <- FiberPool.fiber(some_work())
  result <- FiberPool.await(h)
  case result do
    {:ok, value} -> # use value
    {:error, reason} -> # handle error
  end
end

await!(handle)

@spec await!(Skuld.Fiber.Handle.t()) :: Skuld.Comp.Types.computation()

Await a single fiber's result, raising on error.

Suspends the current fiber until the target fiber completes. Returns the result value directly, or raises if the fiber errored.

Example

comp do
  h <- FiberPool.fiber(some_work())
  value <- FiberPool.await!(h)  # raises on error
  # use value
end

await_all(handles)

@spec await_all([Skuld.Fiber.Handle.t()]) :: Skuld.Comp.Types.computation()

Await all fibers' results.

Suspends until all fibers complete. Returns results in the same order as the input handles, each as {:ok, value} or {:error, reason}.

await_all!(handles)

@spec await_all!([Skuld.Fiber.Handle.t()]) :: Skuld.Comp.Types.computation()

Await all fibers' results, raising on any error.

Suspends until all fibers complete. Returns result values in the same order as the input handles. Raises if any fiber errored.

await_any(handles)

@spec await_any([Skuld.Fiber.Handle.t()]) :: Skuld.Comp.Types.computation()

Await any fiber's result.

Suspends until at least one fiber completes. Returns {handle, result} where result is {:ok, value} or {:error, reason}.

await_any!(handles)

@spec await_any!([Skuld.Fiber.Handle.t()]) :: Skuld.Comp.Types.computation()

Await any fiber's result, raising on error.

Suspends until at least one fiber completes. Returns {handle, value} for the first fiber to complete. Raises if the fiber errored.

await_consume(handle)

@spec await_consume(Skuld.Fiber.Handle.t()) :: Skuld.Comp.Types.computation()

Await a fiber's result with single-consumer semantics.

Like await/1, but removes the result from the completed map after retrieval. Use this when you know the fiber will only be awaited once, to enable garbage collection of the result.

This is used internally by Channel.take_async for memory-efficient streaming.

cancel(handle)

@spec cancel(Skuld.Fiber.Handle.t()) :: Skuld.Comp.Types.computation()

Cancel a fiber.

Marks the fiber for cancellation. If the fiber is suspended, it will be woken with an error. If running, it will be cancelled at the next suspension point.

fiber(computation, opts \\ [])

Run a computation as a fiber (cooperative, same process).

Returns a handle that can be used to await the result.

Options

  • :priority - Fiber priority (future use)

fiber_all(comps)

Spawn each computation as a fiber, returning all handles.

The handles can be awaited individually or with await_all/1 / await_all!/1.

Example

comp do
  handles <- FiberPool.fiber_all([fetch(:x), fetch(:y), fetch(:z)])
  FiberPool.await_all!(handles)
end

fiber_await_all(comps)

Run a list of computations concurrently as fibers, returning all results in order.

Each computation is spawned as a fiber, then all are awaited with await_all!/1. Raises if any fiber fails.

A single-element list is optimised to skip fiber overhead.

This is the primitive used by the query macro for independent binding groups.

Example

FiberPool.fiber_await_all([fetch(:x), fetch(:y), fetch(:z)])
# returns a computation producing [x_result, y_result, z_result]

map(items, fun)

@spec map([a], (a -> Skuld.Comp.Types.computation())) ::
  Skuld.Comp.Types.computation()
when a: term()

Map a function over items, running each result computation as a fiber, and return all results in order.

This is a convenience combining Enum.map/2, fiber_all/1, and await_all!/1. Raises if any fiber fails.

Example

FiberPool.map([1, 2, 3], &Queries.get_user/1)
# returns a computation producing [user1, user2, user3]

scope(comp, opts \\ [])

Create a structured concurrency scope.

All fibers submitted within the scope will be awaited before the scope returns. If the scope body completes normally, all fibers are awaited and their results discarded (the scope returns the body's result). If the scope body errors, all fibers are cancelled.

Options

  • :on_exit - Optional callback fn result, handles -> computation() called before awaiting fibers. Can be used for custom cleanup.

Example

FiberPool.scope(comp do
  h1 <- FiberPool.fiber(work1())
  h2 <- FiberPool.fiber(work2())
  # Both h1 and h2 will complete before scope exits
  FiberPool.await!(h1)
end)

task(thunk, opts \\ [])

@spec task(
  (-> term()),
  keyword()
) :: Skuld.Comp.Types.computation()

Run a thunk as a BEAM Task (parallel, separate process).

The thunk runs in a separate BEAM process, allowing true parallelism for CPU-bound work. Returns a handle that can be awaited just like fiber handles.

Important: The thunk is a zero-arity function, not a computation. Effects do not work inside tasks because they run in a different process. Extract any values you need from Reader/State before constructing the thunk.

Why thunks, not computations?

Tasks run in a separate BEAM process. While an effectful computation could be shipped to the other process and executed there, the modified environment it returns would then need to be integrated back into the calling process's environment. There is no general way to do this — environment changes from effects (State mutations, Writer accumulations, handler installations, etc.) have no guarantee of commutativity, so a task's environment modifications cannot be safely merged with whatever the calling process has done in the meantime.

Restricting tasks to plain thunks forces the caller to explicitly extract any needed values from the environment before spawning the task, and to explicitly handle the task's return value after awaiting it. This makes the cross-process boundary visible and avoids the need for an unsound automatic environment merge.

Options

  • :timeout - Task timeout in milliseconds (default: 5000)

Example

comp do
  # Extract any needed values from effects first
  config <- Reader.ask(:config)

  # CPU-bound work runs in parallel - thunk captures what it needs
  h1 <- FiberPool.task(fn -> expensive_cpu_work(config) end)
  h2 <- FiberPool.task(fn -> another_cpu_work(config) end)

  # Await results
  r1 <- FiberPool.await!(h1)
  r2 <- FiberPool.await!(h2)

  {r1, r2}
end

with_handler(comp, opts \\ [])

Install the FiberPool handler for a computation.

The handler collects fiber submissions and await operations. Use run/1 or run!/1 to execute the computation with full fiber scheduling.

with_task_supervisor(comp, opts \\ [])

@spec with_task_supervisor(
  Skuld.Comp.Types.computation(),
  keyword()
) :: Skuld.Comp.Types.computation()

Install a Task.Supervisor for BEAM task support.

This starts a Task.Supervisor process before the computation runs and stops it after completion. Required for FiberPool.task/2 — calling task/2 without this will raise an error.

Example

comp do
  h <- FiberPool.task(fn -> expensive_cpu_work() end)
  FiberPool.await!(h)
end
|> FiberPool.with_handler()
|> FiberPool.with_task_supervisor()
|> Comp.run!()

Options

  • :supervisor - An existing Task.Supervisor pid to use instead of starting a new one. The caller is responsible for its lifecycle.