Musubi.Async (musubi v0.3.0)

Copy Markdown View Source

Async lifecycle API for Musubi stores. Exposes assign_async/3,4, start_async/3,4, cancel_async/2,3, and stream_async/3,4.

Public API surface (frozen for M5+)

  • assign_async/3,4 — spawn a task whose result writes one or more Musubi.AsyncResult values into socket.assigns.
  • start_async/3,4 — spawn a task whose result is delivered to the store's handle_async/3 callback. socket.assigns is not mutated by the call itself.
  • cancel_async/2,3 — kill an in-flight task by name/key or by %AsyncResult{} value. Always produces an Musubi.AsyncResult.failed/2 write driven by the resulting :DOWN (or pre-written, when called via the %AsyncResult{} variant).
  • stream_async/3,4 — composite assign_async + Musubi.Stream.stream/4: writes a status flag into socket.assigns and seeds the stream slot in one envelope.

Task supervision

Tasks run under Musubi.AsyncSupervisor (a Task.Supervisor started by Musubi.Application). Pass :supervisor to override per-call:

Musubi.Async.assign_async(socket, :profile, fun, supervisor: MyApp.TaskSup)

Reserved socket-private key

Per-task tracking lives at socket.private[:__musubi_async_refs__] and is runtime-internal. Use Musubi.Async.private_refs_key/0 to introspect without hard-coding the literal.

Result classification

EventTerminal Musubi.AsyncResult value
{:ok, val}ok(prior, val)
{:error, reason}failed(prior, {:error, reason})
task raisesfailed(prior, {:exit, {kind, reason, stacktrace}})
task throwsfailed(prior, {:exit, {{:nocatch, val}, st}})
task exits with reason rfailed(prior, {:exit, r})
:timeout firesfailed(prior, {:exit, :timeout})
cancel_async/2,3 with reason rfailed(prior, {:exit, r})
runtime no longer hosts originating nodelazy-discard; [:musubi, :async, :lazy_discard]

Telemetry

Every async event emits [:musubi, :async, :event] with event in :start | :stop | :exception | :cancel | :lazy_discard. Metadata always includes page_id, path, name, and kind (:assign | :start | :stream). See Musubi.Async.Telemetry for the canonical metadata builder.

Summary

Types

Async kind discriminator carried in tracking + telemetry metadata.

User-supplied name argument.

Acceptable Task.Supervisor reference — module name, registered atom, or pid.

Per-task tracking entry stored under socket.private[:__musubi_async_refs__].

Internal name a tracked task is filed under. Atom for start_async/stream_async; the key list for assign_async.

Functions

Resolves a :DOWN message for a tracked task. Called by Musubi.Page.Server. Writes Musubi.AsyncResult.failed(prior, {:exit, reason}) to every key managed by the tracking entry.

Applies a classified assign_async/stream_async task result to the socket. Called by Musubi.Page.Server from handle_info.

Spawns a background task whose result writes Musubi.AsyncResult values into socket.assigns for the given key (or list of keys).

Kills an in-flight task and resolves its tracked assigns to Musubi.AsyncResult.failed(prior, {:exit, reason}).

Marks a tracking entry's :cancel_reason to :timeout and returns the entry so the caller can kill the task pid. Called by Musubi.Page.Server when a {:musubi_async_timeout, ref} message fires.

Returns the reserved socket-private key holding async-task tracking entries.

Spawns a background task whose result is routed to the store's handle_async(name, result, socket) callback.

Composite assign_async/3,4 + Musubi.Stream.stream/4. Spawns a background task whose successful return populates a previously-declared stream slot AND flips the matching socket.assigns.<name> Musubi.AsyncResult to :ok with result: true (the items live in the stream, not in assigns).

Types

kind()

@type kind() :: :assign | :start | :stream

Async kind discriminator carried in tracking + telemetry metadata.

name_arg()

@type name_arg() :: term()

User-supplied name argument.

supervisor_ref()

@type supervisor_ref() :: module() | atom() | pid()

Acceptable Task.Supervisor reference — module name, registered atom, or pid.

tracking_entry()

@type tracking_entry() :: %{
  ref: reference(),
  pid: pid(),
  kind: kind(),
  keys: [atom()] | nil,
  prior: %{required(atom()) => Musubi.AsyncResult.t()},
  timer_ref: reference() | nil,
  cancel_reason: term() | nil,
  supervisor: supervisor_ref()
}

Per-task tracking entry stored under socket.private[:__musubi_async_refs__].

tracking_name()

@type tracking_name() :: atom() | [atom()]

Internal name a tracked task is filed under. Atom for start_async/stream_async; the key list for assign_async.

Functions

apply_task_down(socket, name, entry, raw_reason)

@spec apply_task_down(Musubi.Socket.t(), tracking_name(), tracking_entry(), term()) ::
  Musubi.Socket.t()

Resolves a :DOWN message for a tracked task. Called by Musubi.Page.Server. Writes Musubi.AsyncResult.failed(prior, {:exit, reason}) to every key managed by the tracking entry.

Honors a previously-stamped :cancel_reason (set by cancel_async/2,3 or :timeout) so the surfaced reason matches the operator-visible cause rather than the raw Process.exit/2 reason.

apply_task_result(socket, name, entry, classified)

@spec apply_task_result(Musubi.Socket.t(), tracking_name(), tracking_entry(), term()) ::
  Musubi.Socket.t()

Applies a classified assign_async/stream_async task result to the socket. Called by Musubi.Page.Server from handle_info.

classified is the wrapped task return: {:ok, user_return} or {:exit, reason_class}.

assign_async(socket, key_or_keys, fun, opts \\ [])

@spec assign_async(Musubi.Socket.t(), name_arg(), (-> term()), keyword()) ::
  Musubi.Socket.t()

Spawns a background task whose result writes Musubi.AsyncResult values into socket.assigns for the given key (or list of keys).

Synchronously writes Musubi.AsyncResult.loading(prior) per key before returning the socket. On task completion the runtime atomically updates each key to either Musubi.AsyncResult.ok(prior, value) (single-key: fun returned {:ok, value}; multi-key: fun returned {:ok, %{key1: v1, key2: v2}}) or Musubi.AsyncResult.failed(prior, reason).

Options

  • :resettrue re-emits loading() (with no prior) for every managed key; a list of keys re-emits loading for that subset only. The prior task (if any) is cancelled.
  • :timeout — milliseconds; on expiry the task is killed and the result becomes failed(prior, {:exit, :timeout}).
  • :supervisorTask.Supervisor name; defaults to Musubi.AsyncSupervisor.

Examples

socket = Musubi.Async.assign_async(socket, :profile, fn -> {:ok, fetch()} end)
socket = Musubi.Async.assign_async(socket, [:user, :org], fn -> {:ok, %{user: u, org: o}} end)
socket = Musubi.Async.assign_async(socket, :profile, fun, reset: true, timeout: 5_000)

cancel_async(socket, target)

@spec cancel_async(Musubi.Socket.t(), name_arg() | Musubi.AsyncResult.t()) ::
  Musubi.Socket.t()

Kills an in-flight task and resolves its tracked assigns to Musubi.AsyncResult.failed(prior, {:exit, reason}).

Three calling shapes:

  • cancel_async(socket, name) — by name (atom for start_async/stream_async, atom or key list for assign_async). Kills the pid; the resulting :DOWN message drives the failed write.
  • cancel_async(socket, name, reason) — same, with explicit reason.
  • cancel_async(socket, %AsyncResult{}, reason) — pre-writes failed/2 synchronously, then kills the task. Use when the caller already holds the %AsyncResult{} and wants the assign updated before returning.

Default reason is {:shutdown, :cancel}.

Emits [:musubi, :async, :cancel].

Examples

socket = Musubi.Async.cancel_async(socket, :profile)
socket = Musubi.Async.cancel_async(socket, :profile, :user_navigated_away)
socket = Musubi.Async.cancel_async(socket, async_result, :user_navigated_away)

cancel_async(socket, ar, reason)

@spec cancel_async(Musubi.Socket.t(), name_arg() | Musubi.AsyncResult.t(), term()) ::
  Musubi.Socket.t()

mark_timeout(socket, name)

@spec mark_timeout(Musubi.Socket.t(), tracking_name()) ::
  {Musubi.Socket.t(), tracking_entry()} | :error

Marks a tracking entry's :cancel_reason to :timeout and returns the entry so the caller can kill the task pid. Called by Musubi.Page.Server when a {:musubi_async_timeout, ref} message fires.

private_refs_key()

@spec private_refs_key() :: :__musubi_async_refs__

Returns the reserved socket-private key holding async-task tracking entries.

start_async(socket, name, fun, opts \\ [])

@spec start_async(Musubi.Socket.t(), atom(), (-> term()), keyword()) ::
  Musubi.Socket.t()

Spawns a background task whose result is routed to the store's handle_async(name, result, socket) callback.

socket.assigns is not mutated by this call — applications that want a visible loading indicator should write one explicitly (typically by also calling assign_async/3,4 for the same data).

A second start_async/3,4 with the same name silently overwrites the prior tracking ref (BDR-0019). The older task continues running and its result is lazy-discarded on arrival, accompanied by a [:musubi, :async, :lazy_discard] telemetry event.

Options

  • :timeout — milliseconds; on expiry the task is killed and handle_async/3 receives {:exit, :timeout}.
  • :supervisorTask.Supervisor name; defaults to Musubi.AsyncSupervisor.

Examples

socket = Musubi.Async.start_async(socket, :warm_cache, fn -> Cache.warm() end)

stream_async(socket, name, fun, opts \\ [])

@spec stream_async(Musubi.Socket.t(), atom(), (-> term()), keyword()) ::
  Musubi.Socket.t()

Composite assign_async/3,4 + Musubi.Stream.stream/4. Spawns a background task whose successful return populates a previously-declared stream slot AND flips the matching socket.assigns.<name> Musubi.AsyncResult to :ok with result: true (the items live in the stream, not in assigns).

The user fun must return one of:

  • {:ok, enumerable} — items are inserted into the stream with no opts.
  • {:ok, enumerable, stream_opts} — items inserted with the given stream/4 options (e.g. [at: 0, limit: -100, reset: true]).
  • {:error, reason} — the assign becomes Musubi.AsyncResult.failed(prior, {:error, reason}) and the stream contents are left untouched.

Any other shape raises ArgumentError inside the task and surfaces as Musubi.AsyncResult.failed(prior, {:exit, ...}).

Calling stream_async for a name with no matching stream :name, ... declaration raises ArgumentError immediately (before the task is spawned).

Options

  • :resettrue cancels the prior task (if any), re-emits Musubi.AsyncResult.loading(prior) for the assign, and leaves stream contents alone. The user fun decides whether to actually reset the stream by returning {:ok, items, reset: true}.
  • :timeout — milliseconds; on expiry the task is killed and the assign becomes failed(prior, {:exit, :timeout}). Stream untouched.
  • :supervisorTask.Supervisor name; defaults to Musubi.AsyncSupervisor.

Examples

socket = Musubi.Async.stream_async(socket, :messages, fn -> {:ok, fetch_messages()} end)
socket = Musubi.Async.stream_async(socket, :messages, fn -> {:ok, items, at: 0, limit: -100} end)
socket = Musubi.Async.stream_async(socket, :messages, fun, reset: true, timeout: 5_000)