Skuld.Effects.Channel (skuld v0.2.3)

View Source

Bounded channel with suspending put/take operations and error propagation.

Channels provide backpressure-aware communication between fibers:

  • When a channel is full, put suspends the fiber until space is available
  • When a channel is empty, take suspends the fiber until an item arrives
  • Error state propagates to all consumers (sticky error)

Channel States

  • :open - normal operation
  • :closed - producer finished normally (consumers drain buffer then get :closed)
  • {:error, reason} - producer failed, error propagates to all consumers

Usage

Channels must be run within a FiberPool with the channel handler installed:

comp do
  ch <- Channel.new(10)

  # Producer fiber
  producer <- FiberPool.fiber(comp do
    Enum.each(1..100, fn i ->
      _ <- Channel.put(ch, i)  # Suspends if buffer full
    end)
    Channel.close(ch)
  end)

  # Consumer fiber
  consumer <- FiberPool.fiber(comp do
    consume_loop(ch)
  end)

  FiberPool.await_all!([producer, consumer])
end
|> Channel.with_handler()
|> FiberPool.with_handler()
|> FiberPool.run()

Error Propagation

When a producer encounters an error, it can signal this to consumers:

case fetch_data() do
  {:ok, items} ->
    Enum.each(items, fn i -> Channel.put(ch, i) end)
    Channel.close(ch)
  {:error, reason} ->
    Channel.error(ch, reason)  # All consumers will see this error!
end

Consumers will receive {:error, reason} from take when the channel is in error state. The error is "sticky" - it doesn't get lost.

Summary

Functions

Close the channel (signal normal completion).

Check if the channel is closed.

Put the channel into error state.

Check if the channel is in error state.

Create a new bounded channel with the given capacity.

Peek at the next item without removing it.

Put an item into the channel.

Put a computation into the channel asynchronously.

Get channel statistics (for debugging/metrics).

Take an item from the channel.

Take from a channel with async fibers, awaiting the result.

Install the channel handler for a computation.

Functions

close(handle)

@spec close(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()

Close the channel (signal normal completion).

After closing:

  • New put operations return {:error, :closed}
  • take continues to drain the buffer, then returns :closed
  • Waiting takers (when buffer empty) are woken with :closed

Close is idempotent - closing an already closed or errored channel is a no-op.

Example

comp do
  # Producer finishes
  _ <- Channel.close(ch)
  :ok
end

closed?(handle)

@spec closed?(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()

Check if the channel is closed.

error(handle, reason)

@spec error(Skuld.Effects.Channel.Handle.t(), term()) ::
  Skuld.Comp.Types.computation()

Put the channel into error state.

After erroring:

  • All waiting takers are woken with {:error, reason}
  • All waiting putters are woken with {:error, reason}
  • All future take operations return {:error, reason} (sticky!)
  • All future put operations return {:error, reason}

Error is idempotent - first error wins.

Example

comp do
  case fetch_data() do
    {:ok, data} -> process(data)
    {:error, reason} ->
      # Propagate error to all consumers
      _ <- Channel.error(ch, reason)
  end
end

errored?(handle)

@spec errored?(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()

Check if the channel is in error state.

new(capacity)

Create a new bounded channel with the given capacity.

The capacity must be a positive integer. Returns a Channel.Handle that can be used for put/take operations.

Example

comp do
  ch <- Channel.new(10)
  # ch is now a Channel.Handle
end

peek(handle)

@spec peek(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()

Peek at the next item without removing it.

Returns:

  • {:ok, item} - next item in buffer
  • :empty - buffer is empty (channel still open)
  • :closed - channel is closed and buffer is empty
  • {:error, reason} - channel is in error state

Unlike take, peek never suspends.

Example

comp do
  case Channel.peek(ch) do
    {:ok, item} -> # item is available but not removed
    :empty -> # no items but channel is open
    :closed -> # channel finished
    {:error, reason} -> # error
  end
end

put(handle, item)

@spec put(Skuld.Effects.Channel.Handle.t(), term()) :: Skuld.Comp.Types.computation()

Put an item into the channel.

Returns:

  • :ok - item was put successfully
  • {:error, :closed} - channel is closed
  • {:error, reason} - channel is in error state

If the channel buffer is full, the fiber suspends until space is available. If there are waiting takers, the item is handed off directly.

Example

comp do
  result <- Channel.put(ch, item)
  case result do
    :ok -> # item was put
    {:error, reason} -> # channel closed or errored
  end
end

put_async(handle, computation)

@spec put_async(Skuld.Effects.Channel.Handle.t(), Skuld.Comp.Types.computation()) ::
  Skuld.Comp.Types.computation()

Put a computation into the channel asynchronously.

Spawns a fiber to execute the computation and stores the fiber handle in the channel buffer. This enables ordered concurrent processing - computations execute concurrently but results are taken in put-order.

Returns:

  • :ok - fiber was spawned and handle stored
  • {:error, :closed} - channel is closed
  • {:error, reason} - channel is in error state

If the buffer is full, suspends until space is available (backpressure). The buffer size naturally limits the number of concurrent computations.

Example

comp do
  ch <- Channel.new(10)  # max 10 concurrent transforms

  # Producer puts computations - they start executing immediately
  _ <- Channel.put_async(ch, expensive_transform(item1))
  _ <- Channel.put_async(ch, expensive_transform(item2))

  # Consumer takes resolved values in put-order
  {:ok, result1} <- Channel.take_async(ch)
  {:ok, result2} <- Channel.take_async(ch)
end

stats(handle)

@spec stats(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()

Get channel statistics (for debugging/metrics).

take(handle)

@spec take(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()

Take an item from the channel.

Returns:

  • {:ok, item} - got an item
  • :closed - channel is closed and buffer is empty
  • {:error, reason} - channel is in error state (sticky!)

If the channel buffer is empty and the channel is open, the fiber suspends until an item is available.

Example

comp do
  case Channel.take(ch) do
    {:ok, item} -> # process item
    :closed -> # channel finished
    {:error, reason} -> # error from producer
  end
end

take_async(handle)

@spec take_async(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()

Take from a channel with async fibers, awaiting the result.

Takes a fiber handle from the buffer and awaits its completion. Returns the fiber's result value, preserving put-order even when computations complete out of order.

Returns:

  • {:ok, value} - fiber completed successfully with value
  • :closed - channel is closed and buffer is empty
  • {:error, reason} - channel errored OR fiber failed

Example

comp do
  input <- Stream.from_enum(items)
  output <- Channel.new(10)

  # Producer: put_async spawns transform fibers
  _ <- FiberPool.fiber(comp do
    Stream.each(input, fn item ->
      Channel.put_async(output, transform(item))
    end)
    Channel.close(output)
  end)

  # Consumer: take_async awaits in order
  collect_async_results(output, [])
end

Mixed Usage

If a non-async item is taken (one not put via put_async), it is returned as {:ok, item} without awaiting.

with_handler(comp)

Install the channel handler for a computation.

This initializes the channel state storage. Must be used before any channel operations.

Example

comp do
  ch <- Channel.new(10)
  # ... channel operations
end
|> Channel.with_handler()
|> FiberPool.with_handler()
|> FiberPool.run()