Skuld.AsyncComputation (skuld v0.2.3)
View SourceRun a computation in a separate process, bridging yields, throws, and results back to the calling process via messages.
This is for running effectful computations from non-effectful code (e.g., LiveView).
If you're inside a computation and want concurrency, use Skuld.Effects.FiberPool instead.
Messages
The runner sends messages to the caller in the form {AsyncComputation, tag, result}:
%ExternalSuspend{value: v, data: d, resume: nil}- computation yielded, waiting for resume%Throw{error: e}- computation threw an error%Cancelled{reason: r}- computation was cancelled- Any other value - computation completed successfully
The ExternalSuspend.data field contains any decorations added by scoped effects
(e.g., EffectLogger attaches its log here). The resume field is always nil
in IPC messages since resume functions can't be sent between processes.
Example
# Build computation with handlers
computation =
comp do
result <- Command.execute(%CreateTodo{title: "Buy milk"})
result
end
|> Command.with_handler(&DomainHandler.handle/1)
|> Reader.with_handler(context, tag: CommandContext)
|> EctoPersist.with_handler(Repo)
# Start async - will add Yield and Throw handlers
{:ok, runner} = AsyncComputation.start(computation, tag: :create_todo)
# Or start sync for fast-yielding computations
{:ok, runner, %ExternalSuspend{value: :ready}} =
AsyncComputation.start_sync(computation, tag: :create_todo)
# In handle_info - single clause handles all messages for a tag:
def handle_info({AsyncComputation, :create_todo, result}, socket) do
case result do
%ExternalSuspend{value: value, data: data} ->
handle_yield(value, data, socket)
%Throw{error: error} ->
handle_error(error, socket)
%Cancelled{reason: reason} ->
handle_cancelled(reason, socket)
value ->
handle_success(value, socket)
end
endWith Yields
# Computation that yields for user input
computation =
comp do
name <- Yield.yield(:get_name)
email <- Yield.yield(:get_email)
create_user(name, email)
end
|> ...handlers...
{:ok, runner} = AsyncComputation.start(computation, tag: :create_user)
# Handle yields
def handle_info({AsyncComputation, :create_user, %ExternalSuspend{value: :get_name}}, socket) do
# Maybe wait for user input, then:
AsyncComputation.resume(runner, "Alice")
{:noreply, socket}
end
def handle_info({AsyncComputation, :create_user, %ExternalSuspend{value: :get_email}}, socket) do
AsyncComputation.resume(runner, "alice@example.com")
{:noreply, socket}
end
def handle_info({AsyncComputation, :create_user, {:ok, user}}, socket) do
{:noreply, assign(socket, user: user)}
end
Summary
Functions
Cancel a running computation (async).
Cancel a running computation and wait for it to complete.
Resume a yielded computation with a value (async).
Resume a yielded computation and wait synchronously for the next response.
Start a computation in a separate process.
Start a computation and wait synchronously for the first response.
Types
Functions
@spec cancel(t()) :: :ok
Cancel a running computation (async).
Sends a cancel signal to the computation. The computation will invoke leave_scope
for all active scoped effects (allowing cleanup), then send a
{AsyncComputation, tag, %Cancelled{reason: :cancelled}} message to the caller.
Use cancel_sync/2 if you need to wait for the cancellation to complete.
@spec cancel_sync( t(), keyword() ) :: Skuld.Comp.Cancelled.t() | {:error, :timeout}
Cancel a running computation and wait for it to complete.
Like cancel/1, but blocks until the computation has finished its cleanup
(invoking leave_scope for all active scoped effects) and returns the result.
This can be called from any process - the response will be sent to the calling
process, not necessarily the original caller from start/2.
Options
:timeout- Maximum time to wait in ms (default: 5000)
Returns
%Cancelled{reason: :cancelled}- computation was cancelled successfully{:error, :timeout}- timed out waiting for cancellation to complete
Example
{:ok, runner, %ExternalSuspend{value: :ready}} =
AsyncComputation.start_sync(computation, tag: :worker)
# Cancel and wait for cleanup to finish
%Cancelled{reason: :cancelled} = AsyncComputation.cancel_sync(runner)
Resume a yielded computation with a value (async).
Call this after receiving a {AsyncComputation, tag, %ExternalSuspend{}} message.
The next response will arrive via message to the caller (or :reply_to if specified).
Options
:reply_to- Process to send the response to (default: original caller from start)
@spec resume_sync(t(), term(), keyword()) :: Skuld.Comp.ExternalSuspend.t() | Skuld.Comp.Throw.t() | Skuld.Comp.Cancelled.t() | term() | {:error, :timeout}
Resume a yielded computation and wait synchronously for the next response.
Blocks until the computation yields again, completes, throws, or times out.
This can be called from any process - the response will be sent to the calling
process, not necessarily the original caller from start/2.
Options
:timeout- Maximum time to wait in ms (default: 5000)
Returns
%ExternalSuspend{value: v, data: d}- computation yielded again%Throw{error: e}- computation threw%Cancelled{reason: r}- computation cancelled- Any other value - computation completed
{:error, :timeout}- timed out waiting for response
Example
{:ok, runner} = AsyncComputation.start(computation, tag: :cmd)
# First yield arrives via message
receive do
{AsyncComputation, :cmd, %ExternalSuspend{value: :ready}} -> :ok
end
# Now resume and wait synchronously
case AsyncComputation.resume_sync(runner, %SomeCommand{}) do
%ExternalSuspend{value: :ready} -> # ready for next command
%Throw{error: e} -> # something went wrong
%Cancelled{reason: r} -> # was cancelled
value -> # computation finished with value
end
@spec start( Skuld.Comp.Types.computation(), keyword() ) :: {:ok, t()}
Start a computation in a separate process.
The computation will have Throw.with_handler/1 and Yield.with_handler/1
added automatically (outermost). Add your other handlers before calling start.
Options
:tag- Required. Tag for messages, e.g.:create_todo:caller- Process to send messages to (default:self()):link- Whether to link the runner process (default:true)
Returns
{:ok, runner} where runner is used with resume/2 and cancel/1.
@spec start_sync( Skuld.Comp.Types.computation(), keyword() ) :: {:ok, t(), term()} | {:error, :timeout}
Start a computation and wait synchronously for the first response.
Use this when you know the computation will quickly yield after setup (e.g., a command processor that immediately yields waiting for commands). Avoids dealing with async messages for the initial handshake.
Options
Same as start/2, plus:
:timeout- Maximum time to wait in ms (default: 5000)
Returns
{:ok, runner, result}where result is one of:%ExternalSuspend{value: v, data: d}- computation yielded%Throw{error: e}- computation threw%Cancelled{reason: r}- computation cancelled- Any other value - computation completed
{:error, :timeout}- timed out waiting for first response
Example
# Command processor that yields immediately for commands
{:ok, runner, %ExternalSuspend{value: :ready}} =
command_processor
|> Reader.with_handler(context)
|> AsyncComputation.start_sync(tag: :processor)
# Now resume synchronously for quick commands
%ExternalSuspend{value: :ready} = AsyncComputation.resume_sync(runner, %QuickCommand{})