Observables
Copy MarkdownAn observable is a GenServer that pushes state updates to every subscribed component
the moment something changes. Instead of polling or manually sending messages, you
call notify_observers/1 after a mutation and all interested components re-render
automatically.
The key feature is projections: a subscriber provides a function that extracts only the slice of state it cares about. If the projected result equals what the component last rendered, the update is suppressed — no re-render. This is the change-or-bust optimization that keeps large UIs fast.
Projections run client-side (in the component fiber), not on the server. The server sends raw state to subscribers; each subscriber's projection fn is then applied locally. This means projection functions can safely close over local component state without any coordination with the server.
This guide uses the Cart & Checkout example from examples/cart. By the end you will
understand Observable.GenServer, use_observable/2, the change-or-bust mechanism,
and how to test observable components.
The Observable.GenServer macro
use Filament.Observable.GenServer turns any GenServer into one that Filament
components can subscribe to. Here is the real Cart.Server:
defmodule Cart.Server do
use Filament.Observable.GenServer
def start_link(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)
GenServer.start_link(__MODULE__, %Cart.State{}, name: name)
end
# Called when a new component subscribes.
# Return {:ok, initial_value, new_state} to accept.
@impl Filament.Observable
def handle_subscribe(_subscriber, state) do
{:ok, state, state}
end
@impl GenServer
def handle_call({:add_item, item}, _from, state) do
new_state = Cart.State.add_item(state, item)
notify_observers(new_state) # push raw state to all subscribers
{:reply, :ok, new_state}
end
@impl GenServer
def handle_call({:remove_item, item_id}, _from, state) do
new_state = Cart.State.remove_item(state, item_id)
notify_observers(new_state)
{:reply, :ok, new_state}
end
endWhat the macro injects:
handle_call({:filament_subscribe, ...})— registers a subscriber and monitors its process. Calls yourhandle_subscribe/2callback.handle_cast({:filament_unsubscribe, ...})— removes a subscriber.handle_info({:DOWN, ...})— automatically drops subscribers whose LiveView process terminates.notify_observers/1— call this after every mutation. It compares the new raw state against the last raw state sent to each subscriber and delivers{:filament_observable_updates, ...}only when the raw state changed.
The default handle_subscribe/2 returns {:ok, state, state} (the current state
as the initial value). Override it to reject subscriptions or return a different
initial value.
Subscribing from a component: use_observable/2
use_observable/2 takes a server reference and a projection function. The projection
function receives either :disconnected (before the WebSocket is established) or the
raw state broadcast by the server. The CartBadge component receives the server as a
prop and projects the item count:
defmodule CartWeb.Components.CartBadge do
use Filament.Component
defcomponent do
prop(:server, :any, default: nil)
def render(%{server: server}) do
count =
use_observable(server, fn
:disconnected -> 0
s -> Cart.State.item_count(s)
end)
~F"""
<span class="cart-badge" data-count={count}>
{if count == 0, do: "", else: "#{count}"}
</span>
"""
end
end
endThe parent component resolves the server once and passes it as a prop:
def render(%{session_id: session_id}) do
server = use_observable(fn -> Cart.Server.ensure_started(session_id) end)
~F"""
<CartBadge server={server} />
<CartItems server={server} />
"""
enduse_observable/1 (factory fn or pid, no projection) returns the resolved pid
(or nil during disconnected renders). use_observable/2 calls the projection
function with :disconnected on the first HTTP render, letting it return a safe
default.
Or use a sentinel to branch on the disconnected case:
cart = use_observable(server, fn :disconnected -> nil; s -> s end)
if cart == nil, do: render_loading(), else: render_cart(cart)On subsequent renders (WebSocket-connected), the hook applies the projection to the latest raw state received from the server.
Server lifecycle with a factory function
When the component owns the server's lifecycle, pass a factory function directly to
use_observable/1:
store = use_observable(fn -> Todo.Store.start_link([]) end)
todos = use_observable(store, fn
:disconnected -> []
s -> s
end)The server starts when the component mounts and can stop itself in
handle_unsubscribe/2 when the last subscriber leaves:
@impl Filament.Observable
def handle_unsubscribe(_subscriber, state) do
# Stop when the last subscriber (component) unmounts:
{:stop, :normal, state}
# Or keep running:
# {:ok, state}
endThis eliminates the need to start the server in mount/3 and thread it as a prop —
the LiveView reduces to:
defmodule TodoWeb.TodoLive do
use Filament.LiveView
def root_component, do: TodoWeb.Components.TodoList
endOn the first render (HTTP pre-connect), use_observable/1 returns nil because
subscribing during an HTTP render would create zombie subscribers. use_observable/2
calls the projection function with :disconnected instead.
Projections and change-or-bust
Because projections run client-side, the server only tracks one piece of state per
subscriber: the last raw state it sent to that subscriber. When
notify_observers/1 is called:
- For each subscriber, compare
new_state !== last_raw. - If equal, skip — the subscriber already has this raw state.
- If different, deliver the raw state to the subscriber process and update
last_raw. - The subscriber fiber applies its projection function (with the current closure) and updates the component only if the projected result also changed.
Consider two components subscribed to the same Cart.Server:
CartBadgeprojects withfn s -> Cart.State.item_count(s) endCartViewuses identity (receives the fullCart.State)
When a user changes the price of an item without adding or removing it:
Cart.Servercallsnotify_observers(new_state).- Both subscribers receive the new raw state (it differs from their
last_raw). CartView: projected output differs → re-render.CartBadge:item_count(new_state) == item_count(last_state)(count unchanged) → update suppressed → no re-render.
Filament uses strict inequality (!==) for both comparisons. Primitives and atoms
compare by value; maps and structs compare by identity. If your projection returns
a map you should return the same struct whenever the relevant fields haven't changed.
The projection test from examples/cart/test/cart_test.exs demonstrates this
directly:
test "projection suppresses update when count is unchanged" do
{:ok, stub} = Filament.Test.Stub.start(fn -> %Cart.State{} end)
sub = %Filament.Observable.Subscriber{
pid: self(),
fiber_id: :badge_test_fiber,
slot_index: 0
}
{:ok, _initial} = Filament.Observable.subscribe(stub, sub)
# Push a state with count 0 → 1
state1 =
Cart.State.add_item(
%Cart.State{},
%Cart.Item{id: "a", name: "A", price_cents: 100, quantity: 1}
)
Filament.Test.Stub.push(stub, state1)
assert_receive {:filament_observable_updates, [_]}, 500
# Push the same state again — raw state unchanged, no notification
Filament.Test.Stub.push(stub, state1)
refute_receive {:filament_observable_updates, _}, 100
endhandle_subscribe return values
{:ok, initial_value, new_state}— accept the subscription;initial_valueis the raw state the client receives immediately (used as the seed for change-or-bust tracking and passed through the projection fn for the first render).{:error, reason, new_state}— reject the subscription; raisesFilament.ObservableErrorin the component.
Presence tracking and static_subscribe
use Filament.LiveView defaults to static_subscribe: true, which subscribes during
the initial HTTP render so the page arrives at the browser with real data already
populated. This is good for most read-only projections (item counts, document content,
etc.) because users see meaningful content before the WebSocket connects.
For presence tracking it is the wrong default. Here is why:
Phoenix LiveView uses two separate OS processes per tab: a short-lived HTTP process for the static render, and a long-lived WebSocket process for the connected session. Filament's session-handoff mechanism ensures these two processes share one subscriber slot, so presence normally stays correct. However, on a page reload there is a window where the departing WS connection and the arriving static render overlap — both are alive simultaneously, so presence briefly spikes by one before the old WS tears down.
The fix is simple:
use Filament.LiveView, static_subscribe: falseWith this setting, use_observable/2 returns the :disconnected value on the HTTP
render (so you might show "Connecting…" or 0 initially), then re-renders with live
data the moment the WebSocket connects. Because no subscription is made during the
static render, presence only ever counts real WebSocket connections — the spike
disappears entirely.
Rule of thumb: use static_subscribe: true (the default) when the projected value
is useful in the initial HTML (cart totals, document content). Use
static_subscribe: false when the projection represents who is connected rather than
what the data is — presence counts, online indicators, and live cursors all fall into
this category.
Mutations from event closures
CartView handles item removal via a Phoenix event, but the pattern generalises to
any mutation. The flow is:
- User interaction triggers a call to
Cart.Server.remove_item/2. - The server runs
notify_observers(new_state). - Filament delivers raw state updates to each subscriber whose
last_rawdiffers. - Each subscribed component's fiber applies its projection and re-renders if the projected value changed.
You do not need to do anything special in the component — just call the server and let the observer push the update.
Testing with rung-3
Rung-3 tests use a real GenServer (not a stub) and Filament.Test.update/1 to
drain the observable update message and re-render:
describe "CartView (rung-3, real Cart.Server)" do
setup do
server = start_supervised!(Cart.Server)
{:ok, view} = mount(CartWeb.Components.CartView, %{server: server})
%{server: server, view: view}
end
test "add_item updates rendered view", %{server: server, view: view} do
Cart.Server.add_item(server, %Cart.Item{
id: "w1",
name: "Widget",
price_cents: 999,
quantity: 1
})
view = Filament.Test.update(view)
assert render_text(view) =~ "Widget"
end
test "eventually/2 retries until cart is updated asynchronously", %{
server: server,
view: view
} do
spawn(fn ->
Process.sleep(50)
Cart.Server.add_item(server, %Cart.Item{
id: "async1",
name: "AsyncItem",
price_cents: 100,
quantity: 1
})
end)
view_ref = make_ref()
Process.put(view_ref, view)
Filament.Test.eventually(
fn ->
current = Process.get(view_ref)
updated = Filament.Test.update(current)
Process.put(view_ref, updated)
String.contains?(render_text(updated), "AsyncItem")
end,
timeout: 500
)
end
endKey test helpers:
Filament.Test.update(view)— drains one pending observable update message and re-renders the affected fiber. Returns an updated view struct.Filament.Test.Stub.start(fn -> initial_state end)— creates an in-process observable stub for rung-2 isolation tests.Filament.Test.Stub.push(stub, new_state)— pushes a raw state update through the stub.Filament.Test.eventually(fn -> bool end, timeout: ms)— retries the predicate until it returnstrueor the timeout expires. Useful for asynchronous mutations.
Observable contract
See Filament.Observable for the full @callback specifications including the
handle_unsubscribe/2 cleanup callback.
Next steps
- Hooks guide — learn how to compose hooks and build custom hooks like
use_hold(seeexamples/inventory/lib/inventory_web/hooks.exfor a worked example of resource holds built on top ofuse_observable). - API reference — see
Filament.Observable,Filament.Observable.GenServer, andFilament.Hooks(use_observable/1,use_observable/2) for full signatures.