View Source Sync Engine Part III - Batteries Included

Mix.install([
  {:phoenix_playground, "~> 0.1"},
  {:phoenix_ecto, "~> 4.7"},
  {:ecto_foundationdb, "~> 0.6"}
])

Intro

Using FoundationDB Watches, we can set up a lightweight Sync Engine for database reads. It will automatically propagate new data to all mounted LiveViews with push-messaging delivered from the database directly to the LiveView process, without using PubSub.

This is Part III, where we capture the ideas from I and II into a module that removes as much boilerplate as possible, and keeps your LiveView lean, while maintaining clear data access semantics.

Setup Ecto

First, we set up Ecto, defining a schema, and starting the Repo. Our app will have a product catalog and each Product may have zero or more Reviews.

defmodule Product do
  use Ecto.Schema

  alias Ecto.Changeset

  @primary_key {:id, :binary_id, autogenerate: false}

  schema "products" do
    field(:name, :string)
    field(:description, :string)
    timestamps()
  end

  def changeset(product, params \\ %{}) do
    Changeset.cast(product, params, [:name, :description])
  end
end

defmodule Review do
  use Ecto.Schema

  alias Ecto.Changeset

  @primary_key {:id, :binary_id, autogenerate: true}

  schema "reviews" do
    belongs_to(:product, Product, type: :binary_id)
    field(:author, :string)
    field(:content, :string)
    field(:score, :integer)
    timestamps()
  end

  def changeset(review, params \\ %{}) do
    Changeset.cast(review, params, [:author, :content, :score])
  end
end

defmodule Migration1 do
  use EctoFoundationDB.Migration

  @impl true
  def change() do
    [
      create(metadata(Product)),
      create(index(Product, [:name])),
      create(metadata(Review)),
      create(index(Review, [:product_id, :inserted_at])),
      create(metadata(Review, [:product_id]))
    ]
  end
end

defmodule Repo do
  use Ecto.Repo, otp_app: :my_app, adapter: Ecto.Adapters.FoundationDB

  use EctoFoundationDB.Migrator

  @impl true
  def migrations(), do: [
    {1, Migration1}
  ]
end

# ------------------------------------------------------------------
# This section contains some set-up within Livebook that's typically
# handled instead in your application startup and config.
Application.put_env(:my_app, Repo,
  open_db: &EctoFoundationDB.Sandbox.open_db/1,
  storage_id: EctoFoundationDB.Sandbox
)

{:ok, _} = Ecto.Adapters.FoundationDB.ensure_all_started(Repo.config(), :temporary)
Repo.start_link(log: false)
# ------------------------------------------------------------------

alias EctoFoundationDB.Tenant

Tenant.clear_delete!(Repo, "sync-sample")
tenant = Tenant.open_empty!(Repo, "sync-sample")

Next, we create a product and some reviews by inserting into the database.

{cereal, cereal_reviews} =
  Repo.transactional(tenant, fn ->
  cereal = Repo.insert!(
          %Product{id: "p1", name: "Glo-Grain Cereal", description: "A breakfast cereal made from bioluminescent oats that naturally glows a soft neon blue when submerged in milk."}
        )
  r = [
        %Review{product: cereal, author: "Sarah J.", content: "The kids love the glow, but it makes the milk look like radioactive sludge. Tastes like honey and marshmallows though!", score: 3},
        %Review{product: cereal, author: "Marcus T.", content: "Finally, a cereal I can eat in the dark while watching movies without spilling. 10/10 for utility.", score: 5}
      ]
  {cereal, for(x <- r, do: Repo.insert!(x))}
end)

Let's insert one more product into the database.

{headphones, headphones_reviews} =
  Repo.transactional(tenant, fn ->
  headphones = Repo.insert!(
          %Product{id: "p2", name: "Echo-Free Headphones", description: "High-end audio gear that uses \"Inverse Silence\" technology to make the wearer completely invisible to bats and sonar-based equipment."}
        )
  r = [
        %Review{product: headphones, author: "TechGeek99", content: "The sound quality is crisp, but I haven't been chased by a bat yet, so I can't verify the main feature.", score: 5},
        %Review{product: headphones, author: "NatureLover", content: "A bit bulky for hiking, but the noise cancellation is so good I forgot I was outside.", score: 1}
      ]
  {headphones, for(x <- r, do: Repo.insert!(x))}
end)

Setup LiveView

With the database setup, we can move on to the LiveView page.

First, we'll define a simple CSS stylesheet for our page. The values chosen here are not important for this demo.

defmodule Style do
  def style() do
    """
    <style type="text/css">
      /* Global Resets & Layout */
      body { font-family: sans-serif; line-height: 1.5; color: #333; }

      /* Navigation Bar */
      .catalog ul {
        display: flex;
        justify-content: center;
        gap: 20px;
        padding: 1rem;
        list-style: none;
        background: #f4f4f4;
        border-bottom: 1px solid #ddd;
      }
      .catalog a { text-decoration: none; color: #007bff; font-weight: bold; }
      .catalog a:hover { text-decoration: underline; }

      /* Product Card */
      .card {
        max-width: 500px;
        margin: 2rem auto;
        padding: 2rem;
        border: 1px solid #eee;
        border-radius: 8px;
        box-shadow: 0 4px 6px rgba(0,0,0,0.1);
      }

      .card h1 { text-align: center; margin-top: 0; }
      .card p { text-align: center; color: #666; }

      /* Reviews Section */
      .card ul { padding: 0; list-style: none; }
      .card figure {
        margin: 1.5rem 0;
        padding: 1rem;
        background: #fafafa;
        border-left: 4px solid #007bff;
      }
      blockquote { margin: 0; font-style: italic; }
      figcaption { text-align: right; font-weight: bold; font-size: 0.9rem; }

        /* Form Layout */
      .card form {
        display: flex;
        flex-direction: column;
        gap: 15px; /* Adds space between inputs and button */
        margin-top: 1rem;
      }

      /* Input Styling */
      .card input[type="text"] {
        width: 100%;
        padding: 10px;
        border: 1px solid #ccc;
        border-radius: 4px;
        font-size: 1rem;
        box-sizing: border-box; /* Ensures padding doesn't break width */
      }

      .card input[type="text"]:focus {
        outline: none;
        border-color: #007bff;
        box-shadow: 0 0 0 2px rgba(0,123,255,0.25);
      }

      /* Button Styling */
      .card button {
        padding: 10px 20px;
        background-color: #007bff;
        color: white;
        border: none;
        border-radius: 4px;
        font-weight: bold;
        cursor: pointer;
        transition: background 0.2s;
      }

      .card button:hover {
        background-color: #0056b3;
      }

      /* Label Styling */
      .card label {
        display: block;
        font-weight: bold;
        font-size: 0.85rem;
        color: #555;
        margin-bottom: -10px; /* Pulls the label closer to its specific input */
        text-transform: uppercase;
        letter-spacing: 0.5px;
      }

      /* Add a bit of extra space between form groups */
      .card input + label {
        margin-top: 10px;
      }

      .page-title {
        text-align: center;
        margin: 2rem 0;
        font-size: 2rem;
        color: #222;
        text-transform: uppercase;
        letter-spacing: 1px;
      }
    </style>
    """
  end
end

With the stylesheet out of the way, we can showcase how the Sync module works in your LiveView.

defmodule DemoLive do
  use Phoenix.LiveView

  alias EctoFoundationDB.Sync
  import Ecto.Query

  # Define a Query for the list of products for the navigation bar
  @query_catalog from(p in Product, order_by: p.name)

  # Define a Query for the list of reviews. The query will be further
  # contrained by the product.id below.
  @query_reviews from(r in Review, order_by: {:desc, r.inserted_at})

  def mount(_params, _session, socket) do
    tenant = Tenant.open!(Repo, "sync-sample")

    # Sync the list of products for the navigation bar
    # into the `:catalog` assign.
    {:ok, socket
      |> put_private(:tenant, tenant)
      |> Sync.sync_all(Repo, :catalog, @query_catalog)}
  end

  def handle_params(%{"id" => id}, _uri, socket) do
    # When a specific product is being viewed, sync that product as well
    # as all of its reviews. `sync_all_by` accepts a query and then further
    # constrains it by the provided index values.
    #
    # The product is stored into the `:product` assign.
    #
    # The reviews indexed by `product_id: id` are stored
    # into the `:reviews` assign.
    {:noreply,
      socket
      |> Sync.sync_one(Repo, :product, Product, id)
      |> Sync.sync_all_by(Repo, :reviews, @query_reviews, product_id: id)}
  end
  def handle_params(_params, _uri, socket),
    do: {:noreply, assign(socket, product: nil, reviews: [])}

  def render(assigns) do
    ~H"""
    <h1 class="page-title">Catalog</h1>
    <div class="catalog">
      <ul>
        <li :for={p <- @catalog}>
          <.link patch={"/?id=#{p.id}"}>{p.name}</.link>
        </li>
      </ul>
    </div>
    <div :if={@product} class="card">
      <h1>{@product.name}</h1>
      <p>{@product.description}</p>
      <ul>
        <li :for={r <- @reviews}>
          <figure>
            <blockquote>({ r.score } stars) { r.content }</blockquote>
            <h3><figcaption>- { r.author }</figcaption></h3>
          </figure>
        </li>
      </ul>
    </div>
    <div :if={is_nil(@product)} class="card">
    Choose a product to view.
    </div>

    {Phoenix.HTML.raw(Style.style())}
    """
  end

end

PhoenixPlayground.start(live: DemoLive)

Now we can add or change anything in our database, and the page will update.

seeds = Repo.insert!(
  %Product{id: "p3", name: "Instant-Tree Seeds", description: "A small packet of genetically modified seeds that grows into a fully matured 6-foot oak tree within exactly 45 seconds of watering."},
  prefix: tenant
)
Repo.insert(%Review{product: seeds, author: "GreenThumb", content: "Warning: Do not plant these indoors. I lost a ceiling fan and my dignity in less than a minute.", score: 3}, prefix: tenant)

The syncing works by attaching a hook to your LiveView. When the FDB watch is resolved, the database client sends a message to the process that initiated the watch (your LiveView), and the :handle_info hook updates the assigns automatically, according to the label(s) provided to the Sync module.

An Admin page with Dependent Syncing

Next, we'll create an admin page for the products. The admin will be able to update any product's details from a single page. We'll use a different syncing approach this time: Dependent Syncing.

In the example above, we used sync_all_by for the list of Reviews. The functions sync_all and sync_all_by have an option called :watch_action that defaults to :changes. This means that a change to any review in the list will trigger the FDB watch for the entire list. This may be a reasonable choice for our reviews, but on our admin page, a single product might be changing frequently. We don't want to update the entire list every time!

To solve this, 2 sync operations can be chained together. Our steps will roughly be

  1. Watch the list of product ids that we wish to display: sync_all(..., watch_action: :collection)
  2. Watch each product individually: sync_many()

This ends up being 2 queries on the database on mount/3. However, with this small upfront cost, we have split our syncing notifications into logical groups:

  1. The list of ids will only be changed upon inserts and deletes. This behavior comes from watch_action: collection.
  2. When a Product's internal data changes, the LiveView will be updated for that item specifically, and all others remain unchanged.

Let's get started. We're going to add another route to our phoenix app to support the Admin page, so let's restart the :phoenix_playground to get a clean slate.

Application.stop(:phoenix_playground)
Application.start(:phoenix_playground)

We choose to use a LiveComponent for the Product form. We'll define that LiveComponent now. It doesn't interact with Sync at all.

defmodule ProductAdminCard do
  use Phoenix.LiveComponent

  def update(assigns, socket) do
    {:ok,
      socket
      |> assign(assigns)
      |> assign(form: to_form(Product.changeset(assigns.product)))}
  end

  def handle_event("save", %{"product" => params}, socket) do
    changeset = Product.changeset(socket.assigns.product, params)
    Repo.update!(changeset)
    {:noreply, socket}
  end

  def render(assigns) do
    ~H"""
    <div class="card">
      <.form
        for={@form}
        id={"product-form-#{@product.id}"}
        phx-submit="save"
        phx-target={@myself}
      >
        <label for={@form[:name].id}>Product Name</label>
        <input type="text" id={"#{@product.id}-#{@form[:name].id}"} name={@form[:name].name} value={@form[:name].value} />
        <label for={@form[:description].id}>Description</label>
        <input type="text" id={"#{@product.id}#{@form[:description].id}"} name={@form[:description].name} value={@form[:description].value}} />
        <button>Save</button>
      </.form>
    </div>
    """
  end
end

Now we're ready to define our LiveView. You'll see that in mount/3, we sync our list of product ids with sync_all, similar to the example above.

But just before that, we attach a callback to Sync on the :handle_assigns event. This callback is invoked any time the Sync module changes the assigns on the socket.

This is how the dependent syncing works.

  1. Whenever the list of product ids changes (insert or delete), the handle_assigns/2 callback is invoked.
  2. In this callback, we use sync_many to always ensure we are watching the full list. The assign made by sync_many is compatible with LiveComponents by using the :key attribute in the HEEx list comprehension.
defmodule DemoAdminLive do
    use Phoenix.LiveView

  alias EctoFoundationDB.Sync
  import Ecto.Query

  @query_catalog from(p in Product, select: p.id)

  def mount(_params, _session, socket) do
    tenant = Tenant.open!(Repo, "sync-sample")

    {:ok, socket
      |> put_private(:tenant, tenant)
      |> Sync.attach_callback(Repo, :handle_assigns, &handle_assigns/2)
      |> Sync.sync_all(Repo, :catalog_ids, @query_catalog, watch_action: :collection)}
  end

  defp handle_assigns(socket, %{catalog_ids: _}) do
    {:halt, Sync.sync_many(socket, Repo, :products, Product, socket.assigns.catalog_ids)}
  end
  defp handle_assigns(socket, %{products: _}), do: {:cont, socket}

  def render(assigns) do
    ~H"""
    <h1 class="page-title">Admin</h1>
    <div :for={p <- @products} :key={p.id}>
      <.live_component
          module={ProductAdminCard}
          id={p.id}
          product={p} />
    </div>

    {Phoenix.HTML.raw(Style.style())}
    """
  end
end

defmodule DemoRouter do
  use Phoenix.Router
  import Phoenix.LiveView.Router

  pipeline :browser do
    plug :accepts, ["html"]
    plug :fetch_session
    plug :put_root_layout, html: {PhoenixPlayground.Layout, :root}
    plug :put_secure_browser_headers
  end

  scope "/" do
    pipe_through :browser

    live "/", DemoLive
    live "/admin", DemoAdminLive
  end
end

PhoenixPlayground.start(plug: DemoRouter)

With just this minimal amount of code, the LiveView will never have stale data, and the watches are fully managed for us, including deletions.

You may now view both the home page of our app and the admin page by opening 2 separate tabs side-by-side:

http://localhost:4000
http://localhost:4000/admin

Finally, let's make some database changes to show off the real-time syncing. The code below will concurrently and repeatedly update all the products in the database with an incremented version number.

While it's running, you can navigate the page normally.

import Ecto.Query
ids = Repo.all(from(p in Product, select: p.id), prefix: tenant)
regex = ~r/v(?<v>\d*)$/
incr_vsn = fn _ ->
    Repo.transactional(tenant, fn ->
    p = %{name: name} = Repo.get!(Product, Enum.random(ids))
    name = case Regex.named_captures(regex, name) do
      %{"v" => v} ->
        Regex.replace(regex, name, "v#{String.to_integer(v)+1}")
      _ ->
        name <> " v0"
    end
    Repo.update!(Product.changeset(p, %{name: name}))
  end)
end

Task.async_stream(1..4000, incr_vsn, max_concurrency: 1000, timeout: :infinity)
|> Stream.run()

We can also delete and insert. The code block below will delete all products and recreate them, including delays so that you can observe the changes on the page.

Repo.delete!(cereal, prefix: tenant)
:timer.sleep(500)
Repo.delete!(headphones, prefix: tenant)
:timer.sleep(500)
Repo.delete!(seeds, prefix: tenant)
:timer.sleep(500)
Repo.insert!(cereal, prefix: tenant)
:timer.sleep(500)
Repo.insert!(seeds, prefix: tenant)
:timer.sleep(500)
Repo.insert!(headphones, prefix: tenant)