DurableStreams.LiveView (Streamkeeper v0.3.0)
View SourceHelpers for consuming durable streams in Phoenix LiveView.
This module provides a simple, explicit API for integrating durable streams into LiveView applications. It handles the long-polling loop, connection management, and message routing while leaving application-specific logic to your LiveView.
Usage
defmodule MyAppWeb.EventsLive do
use Phoenix.LiveView
alias DurableStreams.LiveView, as: DSLive
def mount(_params, _session, socket) do
{:ok, DSLive.init(socket)}
end
def handle_event("subscribe", %{"stream_id" => stream_id}, socket) do
{:noreply, DSLive.listen(socket, stream_id)}
end
def handle_event("unsubscribe", _, socket) do
{:noreply, DSLive.stop(socket)}
end
# Handle stream messages
def handle_info(msg, socket) do
if DSLive.stream_message?(msg) do
case DSLive.handle_message(socket, msg) do
{:data, messages, socket} ->
# Process messages your way
{:noreply, process_messages(socket, messages)}
{:status, _status, socket} ->
{:noreply, socket}
{:complete, socket} ->
{:noreply, assign(socket, :finished, true)}
{:error, reason, socket} ->
{:noreply, assign(socket, :error, reason)}
end
else
# Handle other messages
{:noreply, socket}
end
end
defp process_messages(socket, messages) do
Enum.reduce(messages, socket, fn msg, acc ->
update(acc, :events, &[msg.data | &1])
end)
end
endSocket Assigns
This module uses the following assigns (prefixed with ds_ to avoid conflicts):
:ds_stream_id- The current stream ID being listened to:ds_offset- The current offset in the stream:ds_status- Connection status (:idle,:connecting,:streaming,:disconnected):ds_listener_pid- PID of the listener process:ds_listener_ref- Monitor reference for the listener
Options
Both init/2 and listen/3 accept options:
:timeout- Long-poll timeout in milliseconds (default: 30_000):offset- Starting offset (default: "-1" for beginning)
Summary
Functions
Handles a stream message, returning the result and updated socket.
Initializes stream-related assigns on the socket.
Starts listening to a durable stream.
Checks if currently listening to a stream.
Returns the current offset in the stream.
Resets all stream state, clearing the stream ID and offset.
Returns the current stream status.
Stops listening to the current stream.
Returns the current stream ID, or nil if not listening.
Checks if a message is from the stream listener.
Functions
@spec handle_message(Phoenix.LiveView.Socket.t(), term()) :: {:data, [map()], Phoenix.LiveView.Socket.t()} | {:status, atom(), Phoenix.LiveView.Socket.t()} | {:complete, Phoenix.LiveView.Socket.t()} | {:error, term(), Phoenix.LiveView.Socket.t()}
Handles a stream message, returning the result and updated socket.
Returns one of:
{:data, messages, socket}- New data received, messages is a list of%{data: binary, offset: string}{:status, status, socket}- Status changed (:connecting,:streaming,:disconnected){:complete, socket}- Stream is closed, no more data{:error, reason, socket}- An error occurred
Example
def handle_info(msg, socket) do
case DurableStreams.LiveView.handle_message(socket, msg) do
{:data, messages, socket} ->
{:noreply, process_messages(socket, messages)}
{:status, _status, socket} ->
{:noreply, socket}
{:complete, socket} ->
{:noreply, socket}
{:error, reason, socket} ->
{:noreply, assign(socket, :error, reason)}
end
end
@spec init( Phoenix.LiveView.Socket.t(), keyword() ) :: Phoenix.LiveView.Socket.t()
Initializes stream-related assigns on the socket.
Call this in your mount/3 callback to set up the required assigns.
Options
:timeout- Default long-poll timeout in milliseconds (default: 30_000)
Example
def mount(_params, _session, socket) do
{:ok, DurableStreams.LiveView.init(socket)}
end
@spec listen(Phoenix.LiveView.Socket.t(), String.t(), keyword()) :: Phoenix.LiveView.Socket.t()
Starts listening to a durable stream.
This spawns a background process that long-polls the stream and sends
messages back to the LiveView. Messages are handled via handle_message/2.
Options
:offset- Starting offset (default: current offset or "-1"):timeout- Long-poll timeout in milliseconds (default: from init)
Example
def handle_event("subscribe", %{"stream_id" => id}, socket) do
{:noreply, DurableStreams.LiveView.listen(socket, id)}
end
@spec listening?(Phoenix.LiveView.Socket.t()) :: boolean()
Checks if currently listening to a stream.
@spec offset(Phoenix.LiveView.Socket.t()) :: String.t()
Returns the current offset in the stream.
@spec reset(Phoenix.LiveView.Socket.t()) :: Phoenix.LiveView.Socket.t()
Resets all stream state, clearing the stream ID and offset.
Use this when navigating away from a stream entirely.
Example
def handle_params(%{}, _uri, socket) do
{:noreply, DurableStreams.LiveView.reset(socket)}
end
@spec status(Phoenix.LiveView.Socket.t()) :: atom()
Returns the current stream status.
Possible values: :idle, :connecting, :streaming, :disconnected
@spec stop(Phoenix.LiveView.Socket.t()) :: Phoenix.LiveView.Socket.t()
Stops listening to the current stream.
This terminates the listener process and resets the connection status. The stream ID and offset are preserved for potential reconnection.
Example
def handle_event("disconnect", _, socket) do
{:noreply, DurableStreams.LiveView.stop(socket)}
end
@spec stream_id(Phoenix.LiveView.Socket.t()) :: String.t() | nil
Returns the current stream ID, or nil if not listening.
Checks if a message is from the stream listener.
Use this as a guard in your handle_info/2 to route stream messages.
Example
def handle_info(msg, socket) do
if DurableStreams.LiveView.stream_message?(msg) do
# Handle stream message
else
# Handle other messages
end
end