View Source EventStreamex.EventListener (EventStreamex v1.3.1)

Listens for new database WAL events in a Phoenix.LiveView.

As soon as something happens in the database (insert, delete, update), a WAL event is sent and dispatched to operators that listen to it, and to live views that uses the EventStreamex.EventListener module.

Let's say you have an entity named comments and you have a live view that shows a list of thoses comments.

For a better user experience you would like to synchronize this list in realtime as soon as a new comment is created.

For this matter you will use the EventStreamex.EventListener module in your live view:

defmodule MyApp.CommentLive.Index do
  use MyApp, :live_view

  use EventStreamex.EventListener,
    schema: "comments",
    subscriptions: [%{scopes: [post_id: "posts"]}]

  alias MyApp.Blog
  alias MyApp.Blog.Comment

  @impl true
  def mount(params, session, socket) do
    super(params, session, socket)
  end

  @impl true
  def handle_params(%{"post_id" => post_id} = params, url, socket) do
    {_res, socket} = super(params, url, socket)

    {:noreply,
    socket
    |> stream(:comments, Blog.list_comments(post_id))
    |> assign(:post_id, post_id)
  end

  @impl true
  def handle_info(
        {:on_insert, [{"posts", post_id}], "comments", comment},
        %{assigns: %{post_id: post_id}} = socket
      ) do
    {:noreply,
    socket
    |> stream_insert(:comments, comment.new_record)
  end

  @impl true
  def handle_info(
        {:on_update, [{"posts", post_id}], "comments", comment},
        %{assigns: %{post_id: post_id}} = socket
      ) do
    {:noreply,
    socket
    |> stream_insert(:comments, comment.new_record)
  end

  @impl true
  def handle_info(
        {:on_delete, [{"posts", post_id}], "comments", comment},
        %{assigns: %{post_id: post_id}} = socket
      ) do
    {:noreply,
    socket
    |> stream_delete(:comments, comment.old_record)
  end
end

This code will update the comments list as soon as a new comment is either inserted, deleted or udpated (notice that here, comments are linked to a post via the post_id field and we load only the comments for a specific post).

For this code to work you will also have to use the EventStreamex.Events module to mark the entity for WAL listening. Without doing so, the comments entity will not be listened from WAL events, and thus, will not be dispatched to operators and live views

How it works

use EventStreamex.EventListener will do the "magic" by subscribing to the entity changes in Phoenix.LiveView.mount/3, Phoenix.LiveView.terminate/3 and Phoenix.LiveView.handle_params/3 callbacks.

That means that is you override these callbacks you have to call the super function so that the "magic" is done.

The "magic" in question is a subscription to several channels in a pubsub module. There are 3 kinds a channels this module will automatically subscribe to:

  • direct: We subscribe to a specific entity changes (by it's ID)
  • unscoped: We subscribes to changes of all entities in a table
  • scopes: We subscribe to changes in entities that match a specific scope (like having a specific post_id in the example above)

We can use the 3 kinds a subscriptions at the same time:

use EventStreamex.EventListener,
    schema: "comments",
    subscriptions: [:direct, :unscoped, %{scopes: [post_id: "posts"]}]

By default, we automatically subscribe to the direct and unscoped channels.

All events are received in Phoenix.LiveView.handle_info/2 callbacks with a message of this form:

{:on_insert | :on_delete | :on_update, :direct | [] | [{binary(), id()}], binary(), entity_change()}

More information about each kind of message in the subsections below

:direct

The :direct channel subscribes to a specific entity by its id field. This is most usefull in show or edit views where you only need to show one specific entity.

use EventStreamex.EventListener,
    schema: "comments",
    subscriptions: [:direct]

That means that if you want to subscription to be effective, you will have to receive the id of entity in the params. Or, pass it manually when you call the super function:

@impl true
def mount(_params, session, socket) do
  super(%{"id" => get_resource_id(session)}, session, socket)
end

The changes in the entity will be received in the Phoenix.LiveView.handle_info/2 callback with a message of this form:

{:on_insert | :on_update | :on_delete, :direct, binary(), entity_change()}

Here is an example:

defmodule MyApp.CommentLive.Show do
  use MyApp, :live_view

  use EventStreamex.EventListener,
    schema: "comments",
    subscriptions: [:direct]

  alias MyApp.Blog

  @impl true
  def mount(params, session, socket) do
    super(params, session, socket)
  end

  @impl true
  def handle_params(%{"id" => id} = params, url, socket) do
    {_res, socket} = super(params, url, socket)

    {:noreply,
    socket
    |> assign(:id, id)
    |> assign(:comment, Blog.get_comment!(id))}
  end

  @impl true
  def handle_info({:on_insert, :direct, "comments", _comment}, socket) do
    # Should never happen because the entity already exists
    {:noreply, socket}
  end

  @impl true
  def handle_info({:on_update, :direct, "comments", comment}, socket) do
    {:noreply,
    socket
    |> assign(:comment, comment.new_record)
  end

  @impl true
  def handle_info({:on_delete, :direct, "comments", _comment}, socket) do
    # Do some redirection stuff eventually
    {:noreply, socket |> put_flash(:warning, "This comment has been deleted")}
  end
end

:unscoped

The :unscoped channel subscribes to changes of all entities. This is what you will use for index views when your entity is not scoped.

use EventStreamex.EventListener,
    schema: "comments",
    subscriptions: [:unscoped]

The changes in the entities will be received in the Phoenix.LiveView.handle_info/2 callback with a message of this form:

{:on_insert | :on_update | :on_delete, [], binary(), entity_change()}

Here is an example:

defmodule MyApp.CommentLive.Index do
  use MyApp, :live_view

  use EventStreamex.EventListener,
    schema: "comments",
    subscriptions: [:unscoped]

  alias MyApp.Blog
  alias MyApp.Blog.Comment

  @impl true
  def mount(params, session, socket) do
    super(params, session, socket)
  end

  @impl true
  def handle_params(params, url, socket) do
    {_res, socket} = super(params, url, socket)

    {:noreply,
    socket
    |> stream(:comments, Blog.list_comments())
  end

  @impl true
  def handle_info(
        {:on_insert, [], "comments", comment}, socket
      ) do
    {:noreply,
    socket
    |> stream_insert(:comments, comment.new_record)
  end

  @impl true
  def handle_info(
        {:on_update, [], "comments", comment}, socket
      ) do
    {:noreply,
    socket
    |> stream_insert(:comments, comment.new_record)
  end

  @impl true
  def handle_info(
        {:on_delete, [],"comments",  comment}, socket
      ) do
    {:noreply,
    socket
    |> stream_delete(:comments, comment.old_record)
  end
end

:scopes

The :scopes channel subscribes to changes of all entities that match some specific field values.

To declare a scoped entity you will do like this:

use EventStreamex.EventListener,
    schema: "comments",
    subscriptions: [%{scopes: [post_id: "posts"]}]

Here, comments have a post_id field related to a posts entity.

You can also have several scopes for an entity (The order matters for the matching in Phoenix.LiveView.handle_info/2):

use EventStreamex.EventListener,
    schema: "comments",
    subscriptions: [%{scopes: [org_id: "organizations", post_id: "posts"]}]

Here, comments have a org_id field related to a organizations entity, and a post_id field, related to a posts entity.

As for :direct scopes, you will have to receive the scope fields in the params so that the module is able to subscribe to the correct channels. If your fields are not in the params or are named differently, you will have to pass them yourself to the super function:

@impl true
def handle_params(%{"my_post_id" => my_post_id, "my_org_id" => my_org_id} = params, url, socket) do
  {_res, socket} = super(%{"post_id" => my_post_id, "org_id" => my_org_id}, url, socket)

  {:noreply,
  socket
  |> stream(:comments, Blog.list_comments(my_post_id, my_org_id))
end

Events will be received in the c:Phoenix.LiveView.handle_info/2 callback, with messages of this form:

{:on_insert | :on_update | :on_delete, [{"related_scoped_entity", scope_id}], binary(), entity_change()}

For instance, with our previous example, an insert event message will look like this:

{:on_insert, [{"organizations", org_id}, {"posts", post_id}], "comments", entity_change}

The order of scopes will be the same as the one you specified above in the use.

Here is a full example:

defmodule MyApp.CommentLive.Index do
  use MyApp, :live_view

  use EventStreamex.EventListener,
    schema: "comments",
    subscriptions: [%{scopes: [post_id: "posts"]}]

  alias MyApp.Blog
  alias MyApp.Blog.Comment

  @impl true
  def mount(params, session, socket) do
    super(params, session, socket)
  end

  @impl true
  def handle_params(%{"post_id" => post_id} = params, url, socket) do
    {_res, socket} = super(params, url, socket)

    {:noreply,
    socket
    |> stream(:comments, Blog.list_comments(post_id))
    |> assign(:post_id, post_id)
  end

  @impl true
  def handle_info(
        {:on_insert, [{"posts", post_id}], "comments", comment},
        %{assigns: %{post_id: post_id}} = socket
      ) do
    {:noreply,
    socket
    |> stream_insert(:comments, comment.new_record)
  end

  @impl true
  def handle_info(
        {:on_update, [{"posts", post_id}], "comments", comment},
        %{assigns: %{post_id: post_id}} = socket
      ) do
    {:noreply,
    socket
    |> stream_insert(:comments, comment.new_record)
  end

  @impl true
  def handle_info(
        {:on_delete, [{"posts", post_id}], "comments", comment},
        %{assigns: %{post_id: post_id}} = socket
      ) do
    {:noreply,
    socket
    |> stream_delete(:comments, comment.old_record)
  end
end

Entity change structure

The event received in the Phoenix.LiveView.handle_info/2 callbacks have information about the entity and its changes.

Here is what the structure looks like:

%WalEx.Event{
  name: atom(),
  type: :insert | :update | :delete,
  source: %WalEx.Event.Source{
    name: String.t(),
    version: String.t(),
    db: String.t(),
    schema: String.t(),
    table: String.t(),
    columns: map()
  },
  new_record: map() | nil,
  old_record: map() | nil,
  changes: map() | nil,
  timestamp: DateTime.t(),
  lsn: {integer(), integer()}
}

I am using the WalEx package internally

  • name: The name of the entity (ie: :comments for a table named comments)
  • type: The type of event between insert, update and delete
  • source: Information about the event:
    • name: "WalEx"
    • version: Current version of WalEx
    • db: The name of the database
    • schema: Mostly "public"
    • table: The name of the table (ie: "comments")
    • columns: A map of fields with their type (ie: %{"id": "integer", "message": "varchar"})
  • new_record: The entity itself for insert and update events. nil for delete events.
  • old_record: The entity itself for delete events. nil for insert and update events.
  • changes: A map with the changes in the entity in update events, nil otherwise (see below)
  • timestamp: The timstamp of the event in DateTime type
  • lsn: A tuple containing information about the publication cursor

changes

When you receive an update event, you will also have the changes field set to a map containing the changes the entity received since the update.

This map contains the changed fields as keys, and a map describing the change as value. This "change" map contains 2 fields:

  • old_value: The value before the update
  • new_value: The value after the update

For instance, let's say you have a comments entity with 4 fields: id, message, rating, timestamp.

You have a comment with these values:

%Comment{
  id: "dd4bc2ba-c7cc-4a05-a1c7-9f26cd9ab79f",
  message: "This is my first comment",
  rating: 4,
  timestamp: "2024-07-23T12:00:00Z"
}

Now, the comment is update this these new values:

%Comment{
  id: "dd4bc2ba-c7cc-4a05-a1c7-9f26cd9ab79f",
  message: "This is (not) my first comment (anymore)",
  rating: 5,
  timestamp: "2024-07-23T12:00:00Z"
}

The event structure will look like this:

%WalEx.Event{
  name: :comments,
  type: :update,
  source: %WalEx.Event.Source{
    name: "WalEx",
    version: "4.4.0",
    db: "postgresql",
    schema: "public",
    table: "comments",
    columns: %{
      id: "uuid",
      message: "varchar",
      rating: "integer",
      timestamp: "datetime"
    }
  },
  new_record: %Comment{
    id: "dd4bc2ba-c7cc-4a05-a1c7-9f26cd9ab79f",
    message: "This is (not) my first comment (anymore)",
    rating: 5,
    timestamp: "2024-07-23T12:00:00Z"
  },
  old_record: nil,
  changes: %{
    message: %{
      old_value: "This is my first comment",
      new_value: "This is (not) my first comment (anymore)"
    },
    rating: %{
      old_value: 4,
      new_value: 5
    }
  },
  timestamp: "2024-08-25T13:13:30Z",
  lsn: {0, 0}
}

Unsubscribing from events

The unsubscribe from events is done automatically in the c:Phoenix.LiveView.terminate/3 callback. You do not have anything to do except for calling the super function if you override this callback.

Handling subscriptions later

If you need, for any reason, to handle subscriptions at another moment than the mount and handle_params callbacks, we provide the handle_subscriptions/2 function.

This can be useful if the parameters used for scoped channels are handled in a different way than just getting them from query parameters.

Params

  • socket: The current socket
  • params: A map containing the parameters to use for scoped channels. Each scope field must be present in the params map as a string

Returns

The updated socket

Example

defmodule MyModule do
  use EventStreamex.EventListener,
    schema: "comments",
    subscriptions: [%{scopes: [post_id: "posts"]}]

  def handle_params(%{"id" => id}, _url, socket) do
    entity = MyEntity.get(id)

    {:noreply,
      handle_subscriptions(
        socket,
        %{"post_id" => entity.post_id}
      )}
  end
end

In this example, I need a scoped channel with a post_id field. This field is not present in the url (which is a bad practice), but I know I can find it in the entity I fetch.

Here, I don't have to call the super() function because I do not need the EventListener to handle the subscriptions as I do it manually.

You don't have to handle the unsubscribe either because it will be done for you in the Phoenix.LiveView.terminate/3 callback.

Subscribing to other entities' events

Let's say you display posts and handle their creation. And when a new post is created you want to redirect to the detail page of this new post. But, the enitty used for the detail of this post is a derived entity from an operator. Thus, you can't be sure the entity will exist when you redirect to its detail page because of this asynchronicity of the event streaming architecture.

So you would like to listen for this entity beeing created. Hopefully, you know its ID because you just created the post and this derived entity uses the same ID.

So you can call the subscribe_entiy/4 function to listen for the events coming from this derived entity.

subscribe_entity(socket, "post_with_comments_count", :direct, %{"id" => post_id})

Params

  • socket: The socket
  • entity_name: The entity name to listen to as a string
  • subscription: The kind of channel you want to listen to (these are the same as for the module configuration: :direct, :unscoped, %{scopes: []})
  • params: A map with the parameters needed for :direct and :scopes scopes.

Return value

The updated socket

Received events

Events are received the same way as other events but related to the entity:

@impl true
def handle_info({:on_insert, :direct, "post_with_comments_count", post}, socket) do
  {:noreply, socket |> push_navigate(to: "/posts/#{post.id}")}
end

Unsubscribing to other entities' events

Use the function unsubscribe_entity/3 to manually unsubscribe from an event's events.

unsubscribe_entity(socket, "post_with_comments_count", :direct)

The params will be the same used for subscription so no need to pass them again.

Params

  • socket: The socket
  • entity_name: The name of the entity to stop linstening
  • subscription: The kind of channel to stop listening to (these are the same as for the module configuration: :direct, :unscoped, %{scopes: []})

Return value

The updated socket

use params

When you use this module, you will be able to specify these parameters:

  • :schema: The name of the entity as a string (mandatory field)
  • :subscriptions: The subscriptions you want to do (Defaults to [:direct, :unscoped])
  • :application: The application module to use to retrieve config values (Defaults to Application)