DurableStreams.LiveView (Streamkeeper v0.3.0)

View Source

Helpers for consuming durable streams in Phoenix LiveView.

This module provides a simple, explicit API for integrating durable streams into LiveView applications. It handles the long-polling loop, connection management, and message routing while leaving application-specific logic to your LiveView.

Usage

defmodule MyAppWeb.EventsLive do
  use Phoenix.LiveView
  alias DurableStreams.LiveView, as: DSLive

  def mount(_params, _session, socket) do
    {:ok, DSLive.init(socket)}
  end

  def handle_event("subscribe", %{"stream_id" => stream_id}, socket) do
    {:noreply, DSLive.listen(socket, stream_id)}
  end

  def handle_event("unsubscribe", _, socket) do
    {:noreply, DSLive.stop(socket)}
  end

  # Handle stream messages
  def handle_info(msg, socket) do
    if DSLive.stream_message?(msg) do
      case DSLive.handle_message(socket, msg) do
        {:data, messages, socket} ->
          # Process messages your way
          {:noreply, process_messages(socket, messages)}

        {:status, _status, socket} ->
          {:noreply, socket}

        {:complete, socket} ->
          {:noreply, assign(socket, :finished, true)}

        {:error, reason, socket} ->
          {:noreply, assign(socket, :error, reason)}
      end
    else
      # Handle other messages
      {:noreply, socket}
    end
  end

  defp process_messages(socket, messages) do
    Enum.reduce(messages, socket, fn msg, acc ->
      update(acc, :events, &[msg.data | &1])
    end)
  end
end

Socket Assigns

This module uses the following assigns (prefixed with ds_ to avoid conflicts):

  • :ds_stream_id - The current stream ID being listened to
  • :ds_offset - The current offset in the stream
  • :ds_status - Connection status (:idle, :connecting, :streaming, :disconnected)
  • :ds_listener_pid - PID of the listener process
  • :ds_listener_ref - Monitor reference for the listener

Options

Both init/2 and listen/3 accept options:

  • :timeout - Long-poll timeout in milliseconds (default: 30_000)
  • :offset - Starting offset (default: "-1" for beginning)

Summary

Functions

Handles a stream message, returning the result and updated socket.

Initializes stream-related assigns on the socket.

Starts listening to a durable stream.

Checks if currently listening to a stream.

Returns the current offset in the stream.

Resets all stream state, clearing the stream ID and offset.

Returns the current stream status.

Stops listening to the current stream.

Returns the current stream ID, or nil if not listening.

Checks if a message is from the stream listener.

Functions

handle_message(socket, arg)

@spec handle_message(Phoenix.LiveView.Socket.t(), term()) ::
  {:data, [map()], Phoenix.LiveView.Socket.t()}
  | {:status, atom(), Phoenix.LiveView.Socket.t()}
  | {:complete, Phoenix.LiveView.Socket.t()}
  | {:error, term(), Phoenix.LiveView.Socket.t()}

Handles a stream message, returning the result and updated socket.

Returns one of:

  • {:data, messages, socket} - New data received, messages is a list of %{data: binary, offset: string}
  • {:status, status, socket} - Status changed (:connecting, :streaming, :disconnected)
  • {:complete, socket} - Stream is closed, no more data
  • {:error, reason, socket} - An error occurred

Example

def handle_info(msg, socket) do
  case DurableStreams.LiveView.handle_message(socket, msg) do
    {:data, messages, socket} ->
      {:noreply, process_messages(socket, messages)}
    {:status, _status, socket} ->
      {:noreply, socket}
    {:complete, socket} ->
      {:noreply, socket}
    {:error, reason, socket} ->
      {:noreply, assign(socket, :error, reason)}
  end
end

init(socket, opts \\ [])

Initializes stream-related assigns on the socket.

Call this in your mount/3 callback to set up the required assigns.

Options

  • :timeout - Default long-poll timeout in milliseconds (default: 30_000)

Example

def mount(_params, _session, socket) do
  {:ok, DurableStreams.LiveView.init(socket)}
end

listen(socket, stream_id, opts \\ [])

Starts listening to a durable stream.

This spawns a background process that long-polls the stream and sends messages back to the LiveView. Messages are handled via handle_message/2.

Options

  • :offset - Starting offset (default: current offset or "-1")
  • :timeout - Long-poll timeout in milliseconds (default: from init)

Example

def handle_event("subscribe", %{"stream_id" => id}, socket) do
  {:noreply, DurableStreams.LiveView.listen(socket, id)}
end

listening?(socket)

@spec listening?(Phoenix.LiveView.Socket.t()) :: boolean()

Checks if currently listening to a stream.

offset(socket)

@spec offset(Phoenix.LiveView.Socket.t()) :: String.t()

Returns the current offset in the stream.

reset(socket)

Resets all stream state, clearing the stream ID and offset.

Use this when navigating away from a stream entirely.

Example

def handle_params(%{}, _uri, socket) do
  {:noreply, DurableStreams.LiveView.reset(socket)}
end

status(socket)

@spec status(Phoenix.LiveView.Socket.t()) :: atom()

Returns the current stream status.

Possible values: :idle, :connecting, :streaming, :disconnected

stop(socket)

Stops listening to the current stream.

This terminates the listener process and resets the connection status. The stream ID and offset are preserved for potential reconnection.

Example

def handle_event("disconnect", _, socket) do
  {:noreply, DurableStreams.LiveView.stop(socket)}
end

stream_id(socket)

@spec stream_id(Phoenix.LiveView.Socket.t()) :: String.t() | nil

Returns the current stream ID, or nil if not listening.

stream_message?(arg1)

@spec stream_message?(term()) :: boolean()

Checks if a message is from the stream listener.

Use this as a guard in your handle_info/2 to route stream messages.

Example

def handle_info(msg, socket) do
  if DurableStreams.LiveView.stream_message?(msg) do
    # Handle stream message
  else
    # Handle other messages
  end
end