LiveView-native client for tracking flow and job runs in real-time.
Provides functions to start flows, subscribe to existing runs, and handle
PubSub updates — all operating on %PgFlow.Schema.Run{} structs stored
in socket assigns.
Usage
defmodule MyAppWeb.FlowLive do
use MyAppWeb, :live_view
alias PgFlow.LiveClient
def mount(_params, _session, socket) do
{:ok, LiveClient.init(socket, pubsub: MyApp.PubSub)}
end
def handle_event("start", %{"url" => url}, socket) do
case LiveClient.start_flow(socket, :article_flow, %{"url" => url}) do
{:ok, socket} -> {:noreply, socket}
{:error, _reason, socket} -> {:noreply, socket}
end
end
def handle_info({:pgflow, _, _} = msg, socket) do
{:noreply, LiveClient.handle_info(msg, socket)}
end
endMultiple Runs
Track multiple runs simultaneously using the :as option:
socket = LiveClient.start_flow(socket, :flow_a, input, as: :run_a)
socket = LiveClient.subscribe(socket, some_run_id, as: :run_b)Then access @run_a and @run_b independently in templates.
Jobs
Jobs work identically — use enqueue/4 for naming parity with PgFlow.enqueue/2:
LiveClient.enqueue(socket, MyApp.Jobs.SendEmail, %{"to" => "user@example.com"})
Summary
Functions
Starts a job and subscribes to real-time updates.
Handles a PgFlow PubSub message, updating the tracked run in assigns.
Initializes the socket for flow tracking.
Starts a flow and subscribes to real-time updates.
Subscribes to an existing run's real-time updates.
Unsubscribes from a run's updates and resets the assign to nil.
Functions
@spec enqueue(Phoenix.LiveView.Socket.t(), module(), map(), keyword()) :: {:ok, Phoenix.LiveView.Socket.t()} | {:error, term(), Phoenix.LiveView.Socket.t()}
Starts a job and subscribes to real-time updates.
Convenience wrapper providing naming parity with PgFlow.enqueue/2.
Behaves identically to start_flow/4.
Options
:as— assign key (default::flow_run)
@spec handle_info({:pgflow, String.t(), tuple()}, Phoenix.LiveView.Socket.t()) :: Phoenix.LiveView.Socket.t()
Handles a PgFlow PubSub message, updating the tracked run in assigns.
Call this from your LiveView's handle_info/2:
def handle_info({:pgflow, _, _} = msg, socket) do
{:noreply, LiveClient.handle_info(msg, socket)}
endReturns the socket unchanged if the message is for an untracked run.
@spec init( Phoenix.LiveView.Socket.t(), keyword() ) :: Phoenix.LiveView.Socket.t()
Initializes the socket for flow tracking.
Sets the default assign to nil and stores PubSub config in socket private.
Safe to call during both connected and disconnected mount phases.
Options
:pubsub— (required) the Phoenix.PubSub module:as— assign key for the run (default::flow_run):repo— Ecto repo for snapshot loading (default: uses configured PgFlow repo)
@spec start_flow( Phoenix.LiveView.Socket.t(), module() | atom() | String.t(), map(), keyword() ) :: {:ok, Phoenix.LiveView.Socket.t()} | {:error, term(), Phoenix.LiveView.Socket.t()}
Starts a flow and subscribes to real-time updates.
Creates the flow run via PgFlow.Client.start_flow/2, subscribes to the
run's PubSub topic, loads the current state from the database, and assigns
the %Run{} struct.
Returns {:ok, socket} on success or {:error, reason, socket} on failure.
Options
:as— assign key (default::flow_run)
@spec subscribe(Phoenix.LiveView.Socket.t(), String.t(), keyword()) :: Phoenix.LiveView.Socket.t()
Subscribes to an existing run's real-time updates.
Loads the current state from the database and subscribes to PubSub. Useful for observing runs started elsewhere (e.g., run detail pages).
Options
:as— assign key (default::flow_run)
@spec unsubscribe( Phoenix.LiveView.Socket.t(), keyword() ) :: Phoenix.LiveView.Socket.t()
Unsubscribes from a run's updates and resets the assign to nil.
Options
:as— assign key to unsubscribe (default::flow_run)