View Source EventStreamex.EventListener (EventStreamex v1.3.1)
Listens for new database WAL events in a Phoenix.LiveView
.
As soon as something happens in the database (insert, delete, update),
a WAL event is sent and dispatched to operators that listen to it, and to
live views that uses the EventStreamex.EventListener
module.
Let's say you have an entity named comments
and you have a live view that
shows a list of thoses comments.
For a better user experience you would like to synchronize this list in realtime as soon as a new comment is created.
For this matter you will use the EventStreamex.EventListener
module in your live view:
defmodule MyApp.CommentLive.Index do
use MyApp, :live_view
use EventStreamex.EventListener,
schema: "comments",
subscriptions: [%{scopes: [post_id: "posts"]}]
alias MyApp.Blog
alias MyApp.Blog.Comment
@impl true
def mount(params, session, socket) do
super(params, session, socket)
end
@impl true
def handle_params(%{"post_id" => post_id} = params, url, socket) do
{_res, socket} = super(params, url, socket)
{:noreply,
socket
|> stream(:comments, Blog.list_comments(post_id))
|> assign(:post_id, post_id)
end
@impl true
def handle_info(
{:on_insert, [{"posts", post_id}], "comments", comment},
%{assigns: %{post_id: post_id}} = socket
) do
{:noreply,
socket
|> stream_insert(:comments, comment.new_record)
end
@impl true
def handle_info(
{:on_update, [{"posts", post_id}], "comments", comment},
%{assigns: %{post_id: post_id}} = socket
) do
{:noreply,
socket
|> stream_insert(:comments, comment.new_record)
end
@impl true
def handle_info(
{:on_delete, [{"posts", post_id}], "comments", comment},
%{assigns: %{post_id: post_id}} = socket
) do
{:noreply,
socket
|> stream_delete(:comments, comment.old_record)
end
end
This code will update the comments list as soon as a new comment is either
inserted, deleted or udpated (notice that here, comments are linked to a post
via the post_id
field and we load only the comments for a specific post).
For this code to work you will also have to use the EventStreamex.Events
module to mark the entity for WAL listening. Without doing so, the comments
entity will not be listened from WAL events, and thus, will not be dispatched to operators and live views
How it works
use EventStreamex.EventListener
will do the "magic" by subscribing to the entity
changes in Phoenix.LiveView.mount/3
, Phoenix.LiveView.terminate/3
and Phoenix.LiveView.handle_params/3
callbacks.
That means that is you override these callbacks you have to call the super
function
so that the "magic" is done.
The "magic" in question is a subscription to several channels in a pubsub module. There are 3 kinds a channels this module will automatically subscribe to:
direct
: We subscribe to a specific entity changes (by it's ID)unscoped
: We subscribes to changes of all entities in a tablescopes
: We subscribe to changes in entities that match a specific scope (like having a specificpost_id
in the example above)
We can use the 3 kinds a subscriptions at the same time:
use EventStreamex.EventListener,
schema: "comments",
subscriptions: [:direct, :unscoped, %{scopes: [post_id: "posts"]}]
By default, we automatically subscribe to the direct
and unscoped
channels.
All events are received in Phoenix.LiveView.handle_info/2
callbacks with a message of this form:
{:on_insert | :on_delete | :on_update, :direct | [] | [{binary(), id()}], binary(), entity_change()}
More information about each kind of message in the subsections below
:direct
The :direct
channel subscribes to a specific entity by its id
field.
This is most usefull in show
or edit
views where you only need to show one specific entity.
use EventStreamex.EventListener,
schema: "comments",
subscriptions: [:direct]
That means that if you want to subscription to be effective, you will have to receive the id of entity
in the params.
Or, pass it manually when you call the super
function:
@impl true
def mount(_params, session, socket) do
super(%{"id" => get_resource_id(session)}, session, socket)
end
The changes in the entity will be received in the Phoenix.LiveView.handle_info/2
callback
with a message of this form:
{:on_insert | :on_update | :on_delete, :direct, binary(), entity_change()}
Here is an example:
defmodule MyApp.CommentLive.Show do
use MyApp, :live_view
use EventStreamex.EventListener,
schema: "comments",
subscriptions: [:direct]
alias MyApp.Blog
@impl true
def mount(params, session, socket) do
super(params, session, socket)
end
@impl true
def handle_params(%{"id" => id} = params, url, socket) do
{_res, socket} = super(params, url, socket)
{:noreply,
socket
|> assign(:id, id)
|> assign(:comment, Blog.get_comment!(id))}
end
@impl true
def handle_info({:on_insert, :direct, "comments", _comment}, socket) do
# Should never happen because the entity already exists
{:noreply, socket}
end
@impl true
def handle_info({:on_update, :direct, "comments", comment}, socket) do
{:noreply,
socket
|> assign(:comment, comment.new_record)
end
@impl true
def handle_info({:on_delete, :direct, "comments", _comment}, socket) do
# Do some redirection stuff eventually
{:noreply, socket |> put_flash(:warning, "This comment has been deleted")}
end
end
:unscoped
The :unscoped
channel subscribes to changes of all entities.
This is what you will use for index
views when your entity is not scoped.
use EventStreamex.EventListener,
schema: "comments",
subscriptions: [:unscoped]
The changes in the entities will be received in the Phoenix.LiveView.handle_info/2
callback
with a message of this form:
{:on_insert | :on_update | :on_delete, [], binary(), entity_change()}
Here is an example:
defmodule MyApp.CommentLive.Index do
use MyApp, :live_view
use EventStreamex.EventListener,
schema: "comments",
subscriptions: [:unscoped]
alias MyApp.Blog
alias MyApp.Blog.Comment
@impl true
def mount(params, session, socket) do
super(params, session, socket)
end
@impl true
def handle_params(params, url, socket) do
{_res, socket} = super(params, url, socket)
{:noreply,
socket
|> stream(:comments, Blog.list_comments())
end
@impl true
def handle_info(
{:on_insert, [], "comments", comment}, socket
) do
{:noreply,
socket
|> stream_insert(:comments, comment.new_record)
end
@impl true
def handle_info(
{:on_update, [], "comments", comment}, socket
) do
{:noreply,
socket
|> stream_insert(:comments, comment.new_record)
end
@impl true
def handle_info(
{:on_delete, [],"comments", comment}, socket
) do
{:noreply,
socket
|> stream_delete(:comments, comment.old_record)
end
end
:scopes
The :scopes
channel subscribes to changes of all entities that match some
specific field values.
To declare a scoped entity you will do like this:
use EventStreamex.EventListener,
schema: "comments",
subscriptions: [%{scopes: [post_id: "posts"]}]
Here, comments
have a post_id
field related to a posts
entity.
You can also have several scopes for an entity (The order matters for the matching in Phoenix.LiveView.handle_info/2
):
use EventStreamex.EventListener,
schema: "comments",
subscriptions: [%{scopes: [org_id: "organizations", post_id: "posts"]}]
Here, comments
have a org_id
field related to a organizations
entity, and a post_id
field, related to a posts
entity.
As for :direct
scopes, you will have to receive the scope fields in the params so that the module
is able to subscribe to the correct channels.
If your fields are not in the params or are named differently, you will have to pass them yourself to the super
function:
@impl true
def handle_params(%{"my_post_id" => my_post_id, "my_org_id" => my_org_id} = params, url, socket) do
{_res, socket} = super(%{"post_id" => my_post_id, "org_id" => my_org_id}, url, socket)
{:noreply,
socket
|> stream(:comments, Blog.list_comments(my_post_id, my_org_id))
end
Events will be received in the c:Phoenix.LiveView.handle_info/2
callback, with messages of this form:
{:on_insert | :on_update | :on_delete, [{"related_scoped_entity", scope_id}], binary(), entity_change()}
For instance, with our previous example, an insert event message will look like this:
{:on_insert, [{"organizations", org_id}, {"posts", post_id}], "comments", entity_change}
The order of scopes will be the same as the one you specified above in the use
.
Here is a full example:
defmodule MyApp.CommentLive.Index do
use MyApp, :live_view
use EventStreamex.EventListener,
schema: "comments",
subscriptions: [%{scopes: [post_id: "posts"]}]
alias MyApp.Blog
alias MyApp.Blog.Comment
@impl true
def mount(params, session, socket) do
super(params, session, socket)
end
@impl true
def handle_params(%{"post_id" => post_id} = params, url, socket) do
{_res, socket} = super(params, url, socket)
{:noreply,
socket
|> stream(:comments, Blog.list_comments(post_id))
|> assign(:post_id, post_id)
end
@impl true
def handle_info(
{:on_insert, [{"posts", post_id}], "comments", comment},
%{assigns: %{post_id: post_id}} = socket
) do
{:noreply,
socket
|> stream_insert(:comments, comment.new_record)
end
@impl true
def handle_info(
{:on_update, [{"posts", post_id}], "comments", comment},
%{assigns: %{post_id: post_id}} = socket
) do
{:noreply,
socket
|> stream_insert(:comments, comment.new_record)
end
@impl true
def handle_info(
{:on_delete, [{"posts", post_id}], "comments", comment},
%{assigns: %{post_id: post_id}} = socket
) do
{:noreply,
socket
|> stream_delete(:comments, comment.old_record)
end
end
Entity change structure
The event received in the Phoenix.LiveView.handle_info/2
callbacks have information about
the entity and its changes.
Here is what the structure looks like:
%WalEx.Event{
name: atom(),
type: :insert | :update | :delete,
source: %WalEx.Event.Source{
name: String.t(),
version: String.t(),
db: String.t(),
schema: String.t(),
table: String.t(),
columns: map()
},
new_record: map() | nil,
old_record: map() | nil,
changes: map() | nil,
timestamp: DateTime.t(),
lsn: {integer(), integer()}
}
I am using the WalEx package internally
name
: The name of the entity (ie::comments
for a table namedcomments
)type
: The type of event betweeninsert
,update
anddelete
source
: Information about the event:name
: "WalEx"version
: Current version ofWalEx
db
: The name of the databaseschema
: Mostly"public"
table
: The name of the table (ie:"comments"
)columns
: A map of fields with their type (ie:%{"id": "integer", "message": "varchar"}
)
new_record
: The entity itself forinsert
andupdate
events.nil
fordelete
events.old_record
: The entity itself fordelete
events.nil
forinsert
andupdate
events.changes
: A map with the changes in the entity inupdate
events,nil
otherwise (see below)timestamp
: The timstamp of the event inDateTime
typelsn
: A tuple containing information about the publication cursor
changes
When you receive an update
event, you will also have the changes
field set to a map containing
the changes the entity received since the update.
This map contains the changed fields as keys, and a map describing the change as value. This "change" map contains 2 fields:
old_value
: The value before the updatenew_value
: The value after the update
For instance, let's say you have a comments
entity with 4 fields: id
, message
, rating
, timestamp
.
You have a comment with these values:
%Comment{
id: "dd4bc2ba-c7cc-4a05-a1c7-9f26cd9ab79f",
message: "This is my first comment",
rating: 4,
timestamp: "2024-07-23T12:00:00Z"
}
Now, the comment is update this these new values:
%Comment{
id: "dd4bc2ba-c7cc-4a05-a1c7-9f26cd9ab79f",
message: "This is (not) my first comment (anymore)",
rating: 5,
timestamp: "2024-07-23T12:00:00Z"
}
The event structure will look like this:
%WalEx.Event{
name: :comments,
type: :update,
source: %WalEx.Event.Source{
name: "WalEx",
version: "4.4.0",
db: "postgresql",
schema: "public",
table: "comments",
columns: %{
id: "uuid",
message: "varchar",
rating: "integer",
timestamp: "datetime"
}
},
new_record: %Comment{
id: "dd4bc2ba-c7cc-4a05-a1c7-9f26cd9ab79f",
message: "This is (not) my first comment (anymore)",
rating: 5,
timestamp: "2024-07-23T12:00:00Z"
},
old_record: nil,
changes: %{
message: %{
old_value: "This is my first comment",
new_value: "This is (not) my first comment (anymore)"
},
rating: %{
old_value: 4,
new_value: 5
}
},
timestamp: "2024-08-25T13:13:30Z",
lsn: {0, 0}
}
Unsubscribing from events
The unsubscribe from events is done automatically in the c:Phoenix.LiveView.terminate/3
callback.
You do not have anything to do except for calling the super
function if you override this callback.
Handling subscriptions later
If you need, for any reason, to handle subscriptions at another moment than the mount
and handle_params
callbacks,
we provide the handle_subscriptions/2
function.
This can be useful if the parameters used for scoped channels are handled in a different way than just getting them from query parameters.
Params
socket
: The current socketparams
: A map containing the parameters to use for scoped channels. Each scope field must be present in the params map as a string
Returns
The updated socket
Example
defmodule MyModule do
use EventStreamex.EventListener,
schema: "comments",
subscriptions: [%{scopes: [post_id: "posts"]}]
def handle_params(%{"id" => id}, _url, socket) do
entity = MyEntity.get(id)
{:noreply,
handle_subscriptions(
socket,
%{"post_id" => entity.post_id}
)}
end
end
In this example, I need a scoped channel with a post_id
field.
This field is not present in the url (which is a bad practice), but
I know I can find it in the entity I fetch.
Here, I don't have to call the super()
function because I do not need
the EventListener
to handle the subscriptions as I do it manually.
You don't have to handle the unsubscribe either because it will be done for you in the
Phoenix.LiveView.terminate/3
callback.
Subscribing to other entities' events
Let's say you display posts and handle their creation. And when a new post is created you want to redirect to the detail page of this new post. But, the enitty used for the detail of this post is a derived entity from an operator. Thus, you can't be sure the entity will exist when you redirect to its detail page because of this asynchronicity of the event streaming architecture.
So you would like to listen for this entity beeing created. Hopefully, you know its ID because you just created the post and this derived entity uses the same ID.
So you can call the subscribe_entiy/4
function to listen for the events coming from
this derived entity.
subscribe_entity(socket, "post_with_comments_count", :direct, %{"id" => post_id})
Params
socket
: The socketentity_name
: The entity name to listen to as a stringsubscription
: The kind of channel you want to listen to (these are the same as for the module configuration::direct
,:unscoped
,%{scopes: []}
)params
: A map with the parameters needed for:direct
and:scopes
scopes.
Return value
The updated socket
Received events
Events are received the same way as other events but related to the entity:
@impl true
def handle_info({:on_insert, :direct, "post_with_comments_count", post}, socket) do
{:noreply, socket |> push_navigate(to: "/posts/#{post.id}")}
end
Unsubscribing to other entities' events
Use the function unsubscribe_entity/3
to manually unsubscribe from an event's events.
unsubscribe_entity(socket, "post_with_comments_count", :direct)
The params will be the same used for subscription so no need to pass them again.
Params
socket
: The socketentity_name
: The name of the entity to stop linsteningsubscription
: The kind of channel to stop listening to (these are the same as for the module configuration::direct
,:unscoped
,%{scopes: []}
)
Return value
The updated socket
use
params
When you use
this module, you will be able to specify these parameters:
:schema
: The name of the entity as a string (mandatory field):subscriptions
: The subscriptions you want to do (Defaults to[:direct, :unscoped]
):application
: The application module to use to retrieve config values (Defaults toApplication
)