Grove.Session (Grove v0.1.1)

View Source

GenServer managing a collaborative document session.

Sessions provide lifecycle management for collaborative documents:

  • Dynamic start via Registry + DynamicSupervisor
  • Subscriber tracking with automatic crash detection
  • Grace period before shutdown when all subscribers leave
  • Periodic persistence to storage
  • Delta broadcasting via PubSub

Usage

# Start or get existing session
{:ok, pid} = Grove.Session.get_or_start("doc_123", pubsub: MyApp.PubSub)

# Join the session (returns current tree state)
{:ok, tree} = Grove.Session.join(pid)

# Mutate the tree and broadcast delta
:ok = Grove.Session.mutate(pid, fn tree ->
  updated = Grove.Tree.update_node(tree, "node_1", &update_fn/1)
  delta = {:update_attrs, "node_1", %{value: "new"}}
  {updated, delta}
end)

# Leave session when done
:ok = Grove.Session.leave(pid)

LiveView Integration

def mount(%{"id" => doc_id}, _session, socket) do
  {:ok, session_pid} = Grove.Session.get_or_start(doc_id, pubsub: MyApp.PubSub)
  {:ok, tree} = Grove.Session.join(session_pid)

  if connected?(socket) do
    Phoenix.PubSub.subscribe(MyApp.PubSub, Grove.Session.topic(doc_id))
  end

  socket = assign(socket, grove_session: session_pid, grove_tree: tree)
  {:ok, socket}
end

def terminate(_reason, socket) do
  if pid = socket.assigns[:grove_session] do
    Grove.Session.leave(pid)
  end
end

Options

  • :pubsub - Required. Phoenix.PubSub module for broadcasting.
  • :tree - Optional. Initial tree state. Defaults to empty tree.
  • :grace_period - Optional. Ms before shutdown when empty. Default: 30_000.
  • :snapshot_interval - Optional. Ms between persistence. Default: 60_000. Set to nil to disable.
  • :storage - Optional. Storage module (e.g., Grove.Storage.ETS).
  • :storage_ref - Optional. Storage reference for persistence.
  • :use_document_storage - Optional. Use snapshot + event log persistence. Default: true. Set to false for legacy full-tree persistence.

Summary

Functions

Returns a specification to start this module under a supervisor.

Gets an existing session or starts a new one for the document.

Gets the current tree state.

Returns the operation history for the document.

Gets session information.

Joins a session, registering the calling process as a subscriber.

Leaves a session, unregistering the calling process as a subscriber.

Mutates the tree and broadcasts the delta to all subscribers.

Stops a session immediately (for testing/cleanup).

Returns the PubSub topic for a document.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

get_or_start(document_id, opts \\ [])

@spec get_or_start(
  String.t(),
  keyword()
) :: {:ok, pid()} | {:error, term()}

Gets an existing session or starts a new one for the document.

Returns {:ok, pid} on success.

Options

  • :pubsub - Required. Phoenix.PubSub module.
  • :tree - Optional. Initial tree state.
  • :grace_period - Optional. Ms before shutdown when empty.
  • :snapshot_interval - Optional. Ms between persistence.
  • :storage - Optional. Storage module.
  • :storage_ref - Optional. Storage reference.

get_state(session)

@spec get_state(pid() | String.t()) :: {:ok, Grove.Tree.t()}

Gets the current tree state.

history(session, opts \\ [])

@spec history(
  pid() | String.t(),
  keyword()
) :: {:ok, [Grove.Tree.HistoryEntry.t()]}

Returns the operation history for the document.

Delegates to Grove.Tree.history/2. See that function for available options.

Examples

{:ok, history} = Grove.Session.history(session)
{:ok, history} = Grove.Session.history(session, since: timestamp, limit: 10)
{:ok, history} = Grove.Session.history(session, where: [user_id: "u1"])

info(session)

@spec info(pid() | String.t()) :: map()

Gets session information.

Returns a map with:

  • :document_id - The document ID
  • :subscriber_count - Number of active subscribers
  • :tree_size - Number of nodes in the tree

join(session, subscriber_pid \\ self())

@spec join(pid() | String.t(), pid()) :: {:ok, Grove.Tree.t()} | {:error, term()}

Joins a session, registering the calling process as a subscriber.

Returns {:ok, tree} with the current tree state.

leave(session, subscriber_pid \\ self())

@spec leave(pid() | String.t(), pid()) :: :ok

Leaves a session, unregistering the calling process as a subscriber.

If this is the last subscriber, starts the grace period timer.

mutate(session, fun)

@spec mutate(pid() | String.t(), (Grove.Tree.t() -> {Grove.Tree.t(), term()})) :: :ok

Mutates the tree and broadcasts the delta to all subscribers.

The function receives the current tree and should return {updated_tree, delta} where delta is broadcast to subscribers.

start_link(opts)

stop(session)

@spec stop(pid() | String.t()) :: :ok

Stops a session immediately (for testing/cleanup).

topic(document_id)

@spec topic(String.t()) :: String.t()

Returns the PubSub topic for a document.