Async lifecycle API for Musubi stores. Exposes assign_async/3,4,
start_async/3,4, cancel_async/2,3, and stream_async/3,4.
Public API surface (frozen for M5+)
assign_async/3,4— spawn a task whose result writes one or moreMusubi.AsyncResultvalues intosocket.assigns.start_async/3,4— spawn a task whose result is delivered to the store'shandle_async/3callback.socket.assignsis not mutated by the call itself.cancel_async/2,3— kill an in-flight task by name/key or by%AsyncResult{}value. Always produces anMusubi.AsyncResult.failed/2write driven by the resulting:DOWN(or pre-written, when called via the%AsyncResult{}variant).stream_async/3,4— compositeassign_async+Musubi.Stream.stream/4: writes a status flag intosocket.assignsand seeds the stream slot in one envelope.
Task supervision
Tasks run under Musubi.AsyncSupervisor (a Task.Supervisor started by
Musubi.Application). Pass :supervisor to override per-call:
Musubi.Async.assign_async(socket, :profile, fun, supervisor: MyApp.TaskSup)Reserved socket-private key
Per-task tracking lives at socket.private[:__musubi_async_refs__] and is
runtime-internal. Use Musubi.Async.private_refs_key/0 to introspect
without hard-coding the literal.
Result classification
| Event | Terminal Musubi.AsyncResult value |
|---|---|
{:ok, val} | ok(prior, val) |
{:error, reason} | failed(prior, {:error, reason}) |
| task raises | failed(prior, {:exit, {kind, reason, stacktrace}}) |
| task throws | failed(prior, {:exit, {{:nocatch, val}, st}}) |
task exits with reason r | failed(prior, {:exit, r}) |
:timeout fires | failed(prior, {:exit, :timeout}) |
cancel_async/2,3 with reason r | failed(prior, {:exit, r}) |
| runtime no longer hosts originating node | lazy-discard; [:musubi, :async, :lazy_discard] |
Telemetry
Every async event emits [:musubi, :async, :event] with event in
:start | :stop | :exception | :cancel | :lazy_discard. Metadata always
includes page_id, path, name, and kind (:assign | :start | :stream).
See Musubi.Async.Telemetry for the canonical metadata builder.
Summary
Types
Async kind discriminator carried in tracking + telemetry metadata.
User-supplied name argument.
Acceptable Task.Supervisor reference — module name, registered atom, or pid.
Per-task tracking entry stored under socket.private[:__musubi_async_refs__].
Internal name a tracked task is filed under. Atom for start_async/stream_async; the key list for assign_async.
Functions
Resolves a :DOWN message for a tracked task. Called by
Musubi.Page.Server. Writes Musubi.AsyncResult.failed(prior, {:exit, reason})
to every key managed by the tracking entry.
Applies a classified assign_async/stream_async task result to the
socket. Called by Musubi.Page.Server from handle_info.
Spawns a background task whose result writes Musubi.AsyncResult values into
socket.assigns for the given key (or list of keys).
Kills an in-flight task and resolves its tracked assigns to
Musubi.AsyncResult.failed(prior, {:exit, reason}).
Marks a tracking entry's :cancel_reason to :timeout and returns the entry
so the caller can kill the task pid. Called by Musubi.Page.Server when a
{:musubi_async_timeout, ref} message fires.
Returns the reserved socket-private key holding async-task tracking entries.
Spawns a background task whose result is routed to the store's
handle_async(name, result, socket) callback.
Composite assign_async/3,4 + Musubi.Stream.stream/4. Spawns a background
task whose successful return populates a previously-declared stream slot
AND flips the matching socket.assigns.<name> Musubi.AsyncResult to
:ok with result: true (the items live in the stream, not in assigns).
Types
@type kind() :: :assign | :start | :stream
Async kind discriminator carried in tracking + telemetry metadata.
@type name_arg() :: term()
User-supplied name argument.
Acceptable Task.Supervisor reference — module name, registered atom, or pid.
@type tracking_entry() :: %{ ref: reference(), pid: pid(), kind: kind(), keys: [atom()] | nil, prior: %{required(atom()) => Musubi.AsyncResult.t()}, timer_ref: reference() | nil, cancel_reason: term() | nil, supervisor: supervisor_ref() }
Per-task tracking entry stored under socket.private[:__musubi_async_refs__].
Internal name a tracked task is filed under. Atom for start_async/stream_async; the key list for assign_async.
Functions
@spec apply_task_down(Musubi.Socket.t(), tracking_name(), tracking_entry(), term()) :: Musubi.Socket.t()
Resolves a :DOWN message for a tracked task. Called by
Musubi.Page.Server. Writes Musubi.AsyncResult.failed(prior, {:exit, reason})
to every key managed by the tracking entry.
Honors a previously-stamped :cancel_reason (set by cancel_async/2,3 or
:timeout) so the surfaced reason matches the operator-visible cause
rather than the raw Process.exit/2 reason.
@spec apply_task_result(Musubi.Socket.t(), tracking_name(), tracking_entry(), term()) :: Musubi.Socket.t()
Applies a classified assign_async/stream_async task result to the
socket. Called by Musubi.Page.Server from handle_info.
classified is the wrapped task return: {:ok, user_return} or
{:exit, reason_class}.
@spec assign_async(Musubi.Socket.t(), name_arg(), (-> term()), keyword()) :: Musubi.Socket.t()
Spawns a background task whose result writes Musubi.AsyncResult values into
socket.assigns for the given key (or list of keys).
Synchronously writes Musubi.AsyncResult.loading(prior) per key before
returning the socket. On task completion the runtime atomically updates each
key to either Musubi.AsyncResult.ok(prior, value) (single-key:
fun returned {:ok, value}; multi-key: fun returned
{:ok, %{key1: v1, key2: v2}}) or Musubi.AsyncResult.failed(prior, reason).
Options
:reset—truere-emitsloading()(with no prior) for every managed key; a list of keys re-emits loading for that subset only. The prior task (if any) is cancelled.:timeout— milliseconds; on expiry the task is killed and the result becomesfailed(prior, {:exit, :timeout}).:supervisor—Task.Supervisorname; defaults toMusubi.AsyncSupervisor.
Examples
socket = Musubi.Async.assign_async(socket, :profile, fn -> {:ok, fetch()} end)
socket = Musubi.Async.assign_async(socket, [:user, :org], fn -> {:ok, %{user: u, org: o}} end)
socket = Musubi.Async.assign_async(socket, :profile, fun, reset: true, timeout: 5_000)
@spec cancel_async(Musubi.Socket.t(), name_arg() | Musubi.AsyncResult.t()) :: Musubi.Socket.t()
Kills an in-flight task and resolves its tracked assigns to
Musubi.AsyncResult.failed(prior, {:exit, reason}).
Three calling shapes:
cancel_async(socket, name)— by name (atom forstart_async/stream_async, atom or key list forassign_async). Kills the pid; the resulting:DOWNmessage drives the failed write.cancel_async(socket, name, reason)— same, with explicitreason.cancel_async(socket, %AsyncResult{}, reason)— pre-writesfailed/2synchronously, then kills the task. Use when the caller already holds the%AsyncResult{}and wants the assign updated before returning.
Default reason is {:shutdown, :cancel}.
Emits [:musubi, :async, :cancel].
Examples
socket = Musubi.Async.cancel_async(socket, :profile)
socket = Musubi.Async.cancel_async(socket, :profile, :user_navigated_away)
socket = Musubi.Async.cancel_async(socket, async_result, :user_navigated_away)
@spec cancel_async(Musubi.Socket.t(), name_arg() | Musubi.AsyncResult.t(), term()) :: Musubi.Socket.t()
@spec mark_timeout(Musubi.Socket.t(), tracking_name()) :: {Musubi.Socket.t(), tracking_entry()} | :error
Marks a tracking entry's :cancel_reason to :timeout and returns the entry
so the caller can kill the task pid. Called by Musubi.Page.Server when a
{:musubi_async_timeout, ref} message fires.
@spec private_refs_key() :: :__musubi_async_refs__
Returns the reserved socket-private key holding async-task tracking entries.
@spec start_async(Musubi.Socket.t(), atom(), (-> term()), keyword()) :: Musubi.Socket.t()
Spawns a background task whose result is routed to the store's
handle_async(name, result, socket) callback.
socket.assigns is not mutated by this call — applications that want a
visible loading indicator should write one explicitly (typically by also
calling assign_async/3,4 for the same data).
A second start_async/3,4 with the same name silently overwrites the
prior tracking ref (BDR-0019). The older task continues running and its
result is lazy-discarded on arrival, accompanied by a
[:musubi, :async, :lazy_discard] telemetry event.
Options
:timeout— milliseconds; on expiry the task is killed andhandle_async/3receives{:exit, :timeout}.:supervisor—Task.Supervisorname; defaults toMusubi.AsyncSupervisor.
Examples
socket = Musubi.Async.start_async(socket, :warm_cache, fn -> Cache.warm() end)
@spec stream_async(Musubi.Socket.t(), atom(), (-> term()), keyword()) :: Musubi.Socket.t()
Composite assign_async/3,4 + Musubi.Stream.stream/4. Spawns a background
task whose successful return populates a previously-declared stream slot
AND flips the matching socket.assigns.<name> Musubi.AsyncResult to
:ok with result: true (the items live in the stream, not in assigns).
The user fun must return one of:
{:ok, enumerable}— items are inserted into the stream with no opts.{:ok, enumerable, stream_opts}— items inserted with the givenstream/4options (e.g.[at: 0, limit: -100, reset: true]).{:error, reason}— the assign becomesMusubi.AsyncResult.failed(prior, {:error, reason})and the stream contents are left untouched.
Any other shape raises ArgumentError inside the task and surfaces as
Musubi.AsyncResult.failed(prior, {:exit, ...}).
Calling stream_async for a name with no matching stream :name, ...
declaration raises ArgumentError immediately (before the task is spawned).
Options
:reset—truecancels the prior task (if any), re-emitsMusubi.AsyncResult.loading(prior)for the assign, and leaves stream contents alone. The user fun decides whether to actually reset the stream by returning{:ok, items, reset: true}.:timeout— milliseconds; on expiry the task is killed and the assign becomesfailed(prior, {:exit, :timeout}). Stream untouched.:supervisor—Task.Supervisorname; defaults toMusubi.AsyncSupervisor.
Examples
socket = Musubi.Async.stream_async(socket, :messages, fn -> {:ok, fetch_messages()} end)
socket = Musubi.Async.stream_async(socket, :messages, fn -> {:ok, items, at: 0, limit: -100} end)
socket = Musubi.Async.stream_async(socket, :messages, fun, reset: true, timeout: 5_000)