Skuld.Effects.FiberPool (skuld v0.2.3)

View Source

Effect for cooperative fiber-based concurrency.

FiberPool provides lightweight cooperative concurrency within a single process. Fibers are scheduled cooperatively - they run until they await, complete, or error.

Basic Usage

comp do
  # Run work as a fiber (cooperative, same process)
  h1 <- FiberPool.fiber(expensive_computation())
  h2 <- FiberPool.fiber(another_computation())

  # Do other work while fibers run...

  # Await results (raises on error)
  r1 <- FiberPool.await!(h1)
  r2 <- FiberPool.await!(h2)

  {r1, r2}
end
|> FiberPool.with_handler()
|> FiberPool.run()

Structured Concurrency

Use scope/1 to ensure all spawned fibers complete before exiting:

comp do
  FiberPool.scope(comp do
    h1 <- FiberPool.fiber(work1())
    h2 <- FiberPool.fiber(work2())
    # Both h1 and h2 will complete (or be cancelled) before scope exits
    FiberPool.await_all!([h1, h2])
  end)
end

Comparison with Async

FiberPool replaces the Async effect with a simpler, more focused design:

  • Fibers only (no Tasks in this module - see Task integration milestone)
  • Simpler state management
  • Designed for I/O batching integration

Summary

Functions

Await a single fiber's result.

Await a single fiber's result, raising on error.

Await all fibers' results.

Await all fibers' results, raising on any error.

Await any fiber's result.

Await any fiber's result, raising on error.

Await a fiber's result with single-consumer semantics.

Cancel a fiber.

Run a computation as a fiber (cooperative, same process).

Run a computation with FiberPool scheduling.

Run a computation with FiberPool scheduling, extracting the result.

Create a structured concurrency scope.

Run a thunk as a BEAM Task (parallel, separate process).

Install the FiberPool handler for a computation.

Functions

await(handle)

@spec await(Skuld.Fiber.Handle.t()) :: Skuld.Comp.Types.computation()

Await a single fiber's result.

Suspends the current fiber until the target fiber completes. Returns {:ok, value} on success or {:error, reason} on failure.

Example

comp do
  h <- FiberPool.fiber(some_work())
  result <- FiberPool.await(h)
  case result do
    {:ok, value} -> # use value
    {:error, reason} -> # handle error
  end
end

await!(handle)

@spec await!(Skuld.Fiber.Handle.t()) :: Skuld.Comp.Types.computation()

Await a single fiber's result, raising on error.

Suspends the current fiber until the target fiber completes. Returns the result value directly, or raises if the fiber errored.

Example

comp do
  h <- FiberPool.fiber(some_work())
  value <- FiberPool.await!(h)  # raises on error
  # use value
end

await_all(handles)

@spec await_all([Skuld.Fiber.Handle.t()]) :: Skuld.Comp.Types.computation()

Await all fibers' results.

Suspends until all fibers complete. Returns results in the same order as the input handles, each as {:ok, value} or {:error, reason}.

await_all!(handles)

@spec await_all!([Skuld.Fiber.Handle.t()]) :: Skuld.Comp.Types.computation()

Await all fibers' results, raising on any error.

Suspends until all fibers complete. Returns result values in the same order as the input handles. Raises if any fiber errored.

await_any(handles)

@spec await_any([Skuld.Fiber.Handle.t()]) :: Skuld.Comp.Types.computation()

Await any fiber's result.

Suspends until at least one fiber completes. Returns {handle, result} where result is {:ok, value} or {:error, reason}.

await_any!(handles)

@spec await_any!([Skuld.Fiber.Handle.t()]) :: Skuld.Comp.Types.computation()

Await any fiber's result, raising on error.

Suspends until at least one fiber completes. Returns {handle, value} for the first fiber to complete. Raises if the fiber errored.

await_consume(handle)

@spec await_consume(Skuld.Fiber.Handle.t()) :: Skuld.Comp.Types.computation()

Await a fiber's result with single-consumer semantics.

Like await/1, but removes the result from the completed map after retrieval. Use this when you know the fiber will only be awaited once, to enable garbage collection of the result.

This is used internally by Channel.take_async for memory-efficient streaming.

cancel(handle)

@spec cancel(Skuld.Fiber.Handle.t()) :: Skuld.Comp.Types.computation()

Cancel a fiber.

Marks the fiber for cancellation. If the fiber is suspended, it will be woken with an error. If running, it will be cancelled at the next suspension point.

fiber(computation, opts \\ [])

Run a computation as a fiber (cooperative, same process).

Returns a handle that can be used to await the result.

Options

  • :priority - Fiber priority (future use)

run(comp, opts \\ [])

Run a computation with FiberPool scheduling.

This is the main entry point. It:

  1. Runs the main computation
  2. Schedules any spawned fibers and tasks
  3. Returns when all fibers and tasks complete

Options

  • :task_supervisor - Task.Supervisor pid to use for spawning tasks. If not provided, a temporary supervisor is started.

run!(comp, opts \\ [])

@spec run!(
  Skuld.Comp.Types.computation(),
  keyword()
) :: term()

Run a computation with FiberPool scheduling, extracting the result.

Raises if the result is a suspension or error sentinel.

scope(comp, opts \\ [])

Create a structured concurrency scope.

All fibers submitted within the scope will be awaited before the scope returns. If the scope body completes normally, all fibers are awaited and their results discarded (the scope returns the body's result). If the scope body errors, all fibers are cancelled.

Options

  • :on_exit - Optional callback fn result, handles -> computation() called before awaiting fibers. Can be used for custom cleanup.

Example

FiberPool.scope(comp do
  h1 <- FiberPool.fiber(work1())
  h2 <- FiberPool.fiber(work2())
  # Both h1 and h2 will complete before scope exits
  FiberPool.await!(h1)
end)

task(thunk, opts \\ [])

@spec task(
  (-> term()),
  keyword()
) :: Skuld.Comp.Types.computation()

Run a thunk as a BEAM Task (parallel, separate process).

The thunk runs in a separate BEAM process, allowing true parallelism for CPU-bound work. Returns a handle that can be awaited just like fiber handles.

Important: The thunk is a zero-arity function, not a computation. Effects do not work inside tasks because they run in a different process. Extract any values you need from Reader/State before constructing the thunk.

Options

  • :timeout - Task timeout in milliseconds (default: 5000)

Example

comp do
  # Extract any needed values from effects first
  config <- Reader.ask(:config)

  # CPU-bound work runs in parallel - thunk captures what it needs
  h1 <- FiberPool.task(fn -> expensive_cpu_work(config) end)
  h2 <- FiberPool.task(fn -> another_cpu_work(config) end)

  # Await results
  r1 <- FiberPool.await!(h1)
  r2 <- FiberPool.await!(h2)

  {r1, r2}
end

with_handler(comp, opts \\ [])

Install the FiberPool handler for a computation.

The handler collects fiber submissions and await operations. Use run/1 or run!/1 to execute the computation with full fiber scheduling.