Skuld.Effects.Channel (skuld v0.2.3)
View SourceBounded channel with suspending put/take operations and error propagation.
Channels provide backpressure-aware communication between fibers:
- When a channel is full,
putsuspends the fiber until space is available - When a channel is empty,
takesuspends the fiber until an item arrives - Error state propagates to all consumers (sticky error)
Channel States
:open- normal operation:closed- producer finished normally (consumers drain buffer then get:closed){:error, reason}- producer failed, error propagates to all consumers
Usage
Channels must be run within a FiberPool with the channel handler installed:
comp do
ch <- Channel.new(10)
# Producer fiber
producer <- FiberPool.fiber(comp do
Enum.each(1..100, fn i ->
_ <- Channel.put(ch, i) # Suspends if buffer full
end)
Channel.close(ch)
end)
# Consumer fiber
consumer <- FiberPool.fiber(comp do
consume_loop(ch)
end)
FiberPool.await_all!([producer, consumer])
end
|> Channel.with_handler()
|> FiberPool.with_handler()
|> FiberPool.run()Error Propagation
When a producer encounters an error, it can signal this to consumers:
case fetch_data() do
{:ok, items} ->
Enum.each(items, fn i -> Channel.put(ch, i) end)
Channel.close(ch)
{:error, reason} ->
Channel.error(ch, reason) # All consumers will see this error!
endConsumers will receive {:error, reason} from take when the channel
is in error state. The error is "sticky" - it doesn't get lost.
Summary
Functions
Close the channel (signal normal completion).
Check if the channel is closed.
Put the channel into error state.
Check if the channel is in error state.
Create a new bounded channel with the given capacity.
Peek at the next item without removing it.
Put an item into the channel.
Put a computation into the channel asynchronously.
Get channel statistics (for debugging/metrics).
Take an item from the channel.
Take from a channel with async fibers, awaiting the result.
Install the channel handler for a computation.
Functions
@spec close(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()
Close the channel (signal normal completion).
After closing:
- New
putoperations return{:error, :closed} takecontinues to drain the buffer, then returns:closed- Waiting takers (when buffer empty) are woken with
:closed
Close is idempotent - closing an already closed or errored channel is a no-op.
Example
comp do
# Producer finishes
_ <- Channel.close(ch)
:ok
end
@spec closed?(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()
Check if the channel is closed.
@spec error(Skuld.Effects.Channel.Handle.t(), term()) :: Skuld.Comp.Types.computation()
Put the channel into error state.
After erroring:
- All waiting takers are woken with
{:error, reason} - All waiting putters are woken with
{:error, reason} - All future
takeoperations return{:error, reason}(sticky!) - All future
putoperations return{:error, reason}
Error is idempotent - first error wins.
Example
comp do
case fetch_data() do
{:ok, data} -> process(data)
{:error, reason} ->
# Propagate error to all consumers
_ <- Channel.error(ch, reason)
end
end
@spec errored?(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()
Check if the channel is in error state.
@spec new(non_neg_integer()) :: Skuld.Comp.Types.computation()
Create a new bounded channel with the given capacity.
The capacity must be a positive integer. Returns a Channel.Handle
that can be used for put/take operations.
Example
comp do
ch <- Channel.new(10)
# ch is now a Channel.Handle
end
@spec peek(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()
Peek at the next item without removing it.
Returns:
{:ok, item}- next item in buffer:empty- buffer is empty (channel still open):closed- channel is closed and buffer is empty{:error, reason}- channel is in error state
Unlike take, peek never suspends.
Example
comp do
case Channel.peek(ch) do
{:ok, item} -> # item is available but not removed
:empty -> # no items but channel is open
:closed -> # channel finished
{:error, reason} -> # error
end
end
@spec put(Skuld.Effects.Channel.Handle.t(), term()) :: Skuld.Comp.Types.computation()
Put an item into the channel.
Returns:
:ok- item was put successfully{:error, :closed}- channel is closed{:error, reason}- channel is in error state
If the channel buffer is full, the fiber suspends until space is available. If there are waiting takers, the item is handed off directly.
Example
comp do
result <- Channel.put(ch, item)
case result do
:ok -> # item was put
{:error, reason} -> # channel closed or errored
end
end
@spec put_async(Skuld.Effects.Channel.Handle.t(), Skuld.Comp.Types.computation()) :: Skuld.Comp.Types.computation()
Put a computation into the channel asynchronously.
Spawns a fiber to execute the computation and stores the fiber handle in the channel buffer. This enables ordered concurrent processing - computations execute concurrently but results are taken in put-order.
Returns:
:ok- fiber was spawned and handle stored{:error, :closed}- channel is closed{:error, reason}- channel is in error state
If the buffer is full, suspends until space is available (backpressure). The buffer size naturally limits the number of concurrent computations.
Example
comp do
ch <- Channel.new(10) # max 10 concurrent transforms
# Producer puts computations - they start executing immediately
_ <- Channel.put_async(ch, expensive_transform(item1))
_ <- Channel.put_async(ch, expensive_transform(item2))
# Consumer takes resolved values in put-order
{:ok, result1} <- Channel.take_async(ch)
{:ok, result2} <- Channel.take_async(ch)
end
@spec stats(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()
Get channel statistics (for debugging/metrics).
@spec take(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()
Take an item from the channel.
Returns:
{:ok, item}- got an item:closed- channel is closed and buffer is empty{:error, reason}- channel is in error state (sticky!)
If the channel buffer is empty and the channel is open, the fiber suspends until an item is available.
Example
comp do
case Channel.take(ch) do
{:ok, item} -> # process item
:closed -> # channel finished
{:error, reason} -> # error from producer
end
end
@spec take_async(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()
Take from a channel with async fibers, awaiting the result.
Takes a fiber handle from the buffer and awaits its completion. Returns the fiber's result value, preserving put-order even when computations complete out of order.
Returns:
{:ok, value}- fiber completed successfully with value:closed- channel is closed and buffer is empty{:error, reason}- channel errored OR fiber failed
Example
comp do
input <- Stream.from_enum(items)
output <- Channel.new(10)
# Producer: put_async spawns transform fibers
_ <- FiberPool.fiber(comp do
Stream.each(input, fn item ->
Channel.put_async(output, transform(item))
end)
Channel.close(output)
end)
# Consumer: take_async awaits in order
collect_async_results(output, [])
endMixed Usage
If a non-async item is taken (one not put via put_async), it is
returned as {:ok, item} without awaiting.
@spec with_handler(Skuld.Comp.Types.computation()) :: Skuld.Comp.Types.computation()
Install the channel handler for a computation.
This initializes the channel state storage. Must be used before any channel operations.
Example
comp do
ch <- Channel.new(10)
# ... channel operations
end
|> Channel.with_handler()
|> FiberPool.with_handler()
|> FiberPool.run()