Skuld.AsyncComputation (skuld v0.2.3)

View Source

Run a computation in a separate process, bridging yields, throws, and results back to the calling process via messages.

This is for running effectful computations from non-effectful code (e.g., LiveView). If you're inside a computation and want concurrency, use Skuld.Effects.FiberPool instead.

Messages

The runner sends messages to the caller in the form {AsyncComputation, tag, result}:

  • %ExternalSuspend{value: v, data: d, resume: nil} - computation yielded, waiting for resume
  • %Throw{error: e} - computation threw an error
  • %Cancelled{reason: r} - computation was cancelled
  • Any other value - computation completed successfully

The ExternalSuspend.data field contains any decorations added by scoped effects (e.g., EffectLogger attaches its log here). The resume field is always nil in IPC messages since resume functions can't be sent between processes.

Example

# Build computation with handlers
computation =
  comp do
    result <- Command.execute(%CreateTodo{title: "Buy milk"})
    result
  end
  |> Command.with_handler(&DomainHandler.handle/1)
  |> Reader.with_handler(context, tag: CommandContext)
  |> EctoPersist.with_handler(Repo)

# Start async - will add Yield and Throw handlers
{:ok, runner} = AsyncComputation.start(computation, tag: :create_todo)

# Or start sync for fast-yielding computations
{:ok, runner, %ExternalSuspend{value: :ready}} =
  AsyncComputation.start_sync(computation, tag: :create_todo)

# In handle_info - single clause handles all messages for a tag:
def handle_info({AsyncComputation, :create_todo, result}, socket) do
  case result do
    %ExternalSuspend{value: value, data: data} ->
      handle_yield(value, data, socket)

    %Throw{error: error} ->
      handle_error(error, socket)

    %Cancelled{reason: reason} ->
      handle_cancelled(reason, socket)

    value ->
      handle_success(value, socket)
  end
end

With Yields

# Computation that yields for user input
computation =
  comp do
    name <- Yield.yield(:get_name)
    email <- Yield.yield(:get_email)
    create_user(name, email)
  end
  |> ...handlers...

{:ok, runner} = AsyncComputation.start(computation, tag: :create_user)

# Handle yields
def handle_info({AsyncComputation, :create_user, %ExternalSuspend{value: :get_name}}, socket) do
  # Maybe wait for user input, then:
  AsyncComputation.resume(runner, "Alice")
  {:noreply, socket}
end

def handle_info({AsyncComputation, :create_user, %ExternalSuspend{value: :get_email}}, socket) do
  AsyncComputation.resume(runner, "alice@example.com")
  {:noreply, socket}
end

def handle_info({AsyncComputation, :create_user, {:ok, user}}, socket) do
  {:noreply, assign(socket, user: user)}
end

Summary

Functions

Cancel a running computation (async).

Cancel a running computation and wait for it to complete.

Resume a yielded computation with a value (async).

Resume a yielded computation and wait synchronously for the next response.

Start a computation in a separate process.

Start a computation and wait synchronously for the first response.

Types

t()

@type t() :: %Skuld.AsyncComputation{
  caller: pid(),
  monitor_ref: reference(),
  pid: pid(),
  ref: reference(),
  tag: term()
}

Functions

cancel(async_computation)

@spec cancel(t()) :: :ok

Cancel a running computation (async).

Sends a cancel signal to the computation. The computation will invoke leave_scope for all active scoped effects (allowing cleanup), then send a {AsyncComputation, tag, %Cancelled{reason: :cancelled}} message to the caller.

Use cancel_sync/2 if you need to wait for the cancellation to complete.

cancel_sync(async_computation, opts \\ [])

@spec cancel_sync(
  t(),
  keyword()
) :: Skuld.Comp.Cancelled.t() | {:error, :timeout}

Cancel a running computation and wait for it to complete.

Like cancel/1, but blocks until the computation has finished its cleanup (invoking leave_scope for all active scoped effects) and returns the result.

This can be called from any process - the response will be sent to the calling process, not necessarily the original caller from start/2.

Options

  • :timeout - Maximum time to wait in ms (default: 5000)

Returns

  • %Cancelled{reason: :cancelled} - computation was cancelled successfully
  • {:error, :timeout} - timed out waiting for cancellation to complete

Example

{:ok, runner, %ExternalSuspend{value: :ready}} =
  AsyncComputation.start_sync(computation, tag: :worker)

# Cancel and wait for cleanup to finish
%Cancelled{reason: :cancelled} = AsyncComputation.cancel_sync(runner)

resume(async_computation, value, opts \\ [])

@spec resume(t(), term(), keyword()) :: :ok

Resume a yielded computation with a value (async).

Call this after receiving a {AsyncComputation, tag, %ExternalSuspend{}} message. The next response will arrive via message to the caller (or :reply_to if specified).

Options

  • :reply_to - Process to send the response to (default: original caller from start)

resume_sync(async_computation, value, opts \\ [])

@spec resume_sync(t(), term(), keyword()) ::
  Skuld.Comp.ExternalSuspend.t()
  | Skuld.Comp.Throw.t()
  | Skuld.Comp.Cancelled.t()
  | term()
  | {:error, :timeout}

Resume a yielded computation and wait synchronously for the next response.

Blocks until the computation yields again, completes, throws, or times out.

This can be called from any process - the response will be sent to the calling process, not necessarily the original caller from start/2.

Options

  • :timeout - Maximum time to wait in ms (default: 5000)

Returns

  • %ExternalSuspend{value: v, data: d} - computation yielded again
  • %Throw{error: e} - computation threw
  • %Cancelled{reason: r} - computation cancelled
  • Any other value - computation completed
  • {:error, :timeout} - timed out waiting for response

Example

{:ok, runner} = AsyncComputation.start(computation, tag: :cmd)

# First yield arrives via message
receive do
  {AsyncComputation, :cmd, %ExternalSuspend{value: :ready}} -> :ok
end

# Now resume and wait synchronously
case AsyncComputation.resume_sync(runner, %SomeCommand{}) do
  %ExternalSuspend{value: :ready} -> # ready for next command
  %Throw{error: e} -> # something went wrong
  %Cancelled{reason: r} -> # was cancelled
  value -> # computation finished with value
end

start(computation, opts)

@spec start(
  Skuld.Comp.Types.computation(),
  keyword()
) :: {:ok, t()}

Start a computation in a separate process.

The computation will have Throw.with_handler/1 and Yield.with_handler/1 added automatically (outermost). Add your other handlers before calling start.

Options

  • :tag - Required. Tag for messages, e.g. :create_todo
  • :caller - Process to send messages to (default: self())
  • :link - Whether to link the runner process (default: true)

Returns

{:ok, runner} where runner is used with resume/2 and cancel/1.

start_sync(computation, opts)

@spec start_sync(
  Skuld.Comp.Types.computation(),
  keyword()
) :: {:ok, t(), term()} | {:error, :timeout}

Start a computation and wait synchronously for the first response.

Use this when you know the computation will quickly yield after setup (e.g., a command processor that immediately yields waiting for commands). Avoids dealing with async messages for the initial handshake.

Options

Same as start/2, plus:

  • :timeout - Maximum time to wait in ms (default: 5000)

Returns

  • {:ok, runner, result} where result is one of:
    • %ExternalSuspend{value: v, data: d} - computation yielded
    • %Throw{error: e} - computation threw
    • %Cancelled{reason: r} - computation cancelled
    • Any other value - computation completed
  • {:error, :timeout} - timed out waiting for first response

Example

# Command processor that yields immediately for commands
{:ok, runner, %ExternalSuspend{value: :ready}} =
  command_processor
  |> Reader.with_handler(context)
  |> AsyncComputation.start_sync(tag: :processor)

# Now resume synchronously for quick commands
%ExternalSuspend{value: :ready} = AsyncComputation.resume_sync(runner, %QuickCommand{})