View Source EventStreamex.Operators.Operator (EventStreamex v1.3.1)

An operator listens for events and will usually create/update/delete another entity based on those events.

This is a very usefull pattern. Here is why:

The issue without event streaming

Let's say you have a posts entity which contains:

  • title: a title
  • body: a body

You also have a comments entity which contains:

  • post_id: The ID of the post it is attached to
  • comment: The comment content

When you show a list of your posts you might want to show the number of comments it contains.

To do so you will be tempted to make a SQL query with a subquery that will count the comments of your posts.

This is pretty trivial:

SELECT
  p.id AS id,
  p.title AS title,
  p.body AS body,
  (SELECT COUNT(*) FROM comments c WHERE c.post_id = p.id) AS comments_count
FROM posts p;

But it has several flaws. Let's see them in more details.

Your queries will become slower and slower over time

Because subqueries are not free. They take more time to execute that a simple query with no join and subquery.

So, the more posts and comments you have, the slower the query will be.

You will end up adding complexity in your queries

As your app evolves, you will probably create new entities related to your posts and comments. Thus, you will add more and more subqueries and joins to your queries, making them slower and harder to maintain...

Realtime will become a mess to maintain

If you want to handle realtime events like updating the number of comments when a new one is created it will be a mess to maintain overtime because you will end up subscribing to a lot of events as you link your posts to other entities.

And you will probably forget to subscribe to some of the events making your app inconsistant (some features will be in realtime and other won't be).

Almost impossible to maintain recursivity

If your comments can have subcomments it will be almost impossible to maintain the comments count because of the recursivity it implies.

Let's say you create another entity named ratings which saves each rating given to a specific comment.

It would look like this:

  • comment_id: The ID of the comment related to this rating
  • rating: The given rating

Now imagine you want to display for every comment, the average rating and the total count of ratings for this comment.

Here again, you could solve this with some subqueries.

But what if you want to show the average rating of all comments in the post so that users can see that a post has received a lot of useful comments.

I don't even want to think about the query to build to do that...

Benefits from using event streaming

Let's come back to our posts and comments. Now let's imagine that instead of our subquery to count the comments we use an event streaming architecture and create another entity whose goal is specifically to show the posts with their comments count.

This post_with_comments_counts entity will look like that:

  • title: a title
  • body: a body
  • comments_count: the number of comments of the post

And this entity will listen to new comments to update its comments count: when a new comment is created it will increase the number of comments of the related post by 1.

With EventStreamex it would look like that:

defmodule MyApp.Operators.PostWithCommentsCountOperator do
  use EventStreamex.Operators.Operator,
    schema: "post_with_comments_counts",
    repo: MyApp.Repo

  on_event(
    all_events(),
    entities(["comments", "posts"]),
    query(
      "MERGE INTO post_with_comments_counts dest
      USING (
          SELECT
              p.id AS id,
              p.title AS title,
              p.body AS body,
              p.inserted_at AS inserted_at,
              p.updated_at AS updated_at,
              (SELECT COUNT(c.id) FROM comments c WHERE c.post_id = p.id) AS comments_count
          FROM posts p
          WHERE p.id = $1
      ) src ON src.id = dest.id
      WHEN NOT MATCHED THEN
        INSERT (id, title, body, inserted_at, updated_at, comments_count)
        VALUES (src.id, src.title, src.body, src.inserted_at, src.updated_at, src.comments_count)
      WHEN MATCHED THEN
        UPDATE SET
            title = src.title,
            body = src.body,
            inserted_at = src.inserted_at,
            updated_at = src.updated_at,
            comments_count = src.comments_count;",
      id([{"comments", :post_id}, {"posts", :id}]),
      delete_with(["posts"])
    )
  )
end

Here, we use the EventStreamex.Operators.Operator module to listen for posts and comments in all kinds of events (insert, update, delete).

And each time a post or a comment is inserted, updated, deleted, we perform this MERGE query to create/update our post_with_comments_counts.

The MERGE query will insert a new post if the post does not exist and update it if it already exists.

id/1 is used to map the fields of the listened entities to the id of the post_with_comments_counts entity (id for a post and post_id for a comment).

Finally, delete_with/1 tells the operator which entities deletion should also delete the related post_with_comments_counts entity.

As you can see, this MERGE query also uses a subquery, so why is it better than our first implementation?

This is a naive implementation

The query/3 macro is provided for simplicity. But you can also replace it by a function to do things as you think it's best.

This function will receive the event, allowing you to do whatever you want with it.

The query is executed in the background

In our first example, the query was executed everytime someone wants to display the list of posts.

In our event streaming example, the query is executed in the background each time there is an event with a post or a comment.

To display your list of posts you will just have to do:

SELECT * FROM post_with_comments_counts;

Which is simpler and faster than our former query.

Realtime is easy

You can use the module EventStreamex.EventListener to easily add realtime in your live view applications.

Because we are using PostgreSQL WAL events, everytime something happens on an entity (post_with_comments_counts as well), an event is sent.

This event will be processed by operators listening to it (like our operator here does for posts and comments), and it will be sent to a pubsub processor.

The EventStreamex.EventListener, now just has to listen to this pubsub processor to notify your live view the entity your are showing as been updated.

Everything, now, can be realtime, without beeing hard to maintain.

A better pattern to model your data

With this approach you keep your entities consistent and simple. And you create specialized entities for what you want to show in your views.

A post will stay a post with its title and body fields and a comment will stay a comment with a comment field.

post_with_comments_count is just a specialized entity used for your view.

This keeps your data and APIs consistent and simple with explicit fields and names.

For instance, let's say you have a users entity with a login and a password field. But later on, you realize that you need to store some information about your users, like its name, its address, and so on.

You will be tempted to update your users entity to add these new fields:

  • login
  • password
  • address
  • name

But still, in your application, when comes the moment to create your user, you only need the login and password.

So what do you do? You set the other fields as optionals in your API and set them a default value (empty string or null).

And in the view that allows a user to specify more information about him (name and address), you will either:

  • Use the same API endpoint as for the users creation
  • Or create a new dedicated API endpoint

But both approachs are bad practices!

In the first approach that means that your API expects 2 mandatory fields: login and password and 2 optional fields: name and address. But here, there is no sense in sending the login and password. And maybe you would like the name and address fields to be mandatory. But because your API set these fields as optionals, you will have to check for them from the client side. Which is a bad practice.

In the second approach, that means that you will create an API endpoint for the user profile. But this API, will finally update the users entity. You end up with hard to understand APIs because their name does not match to the entity being modified.

A better approach would be to have a users entity with:

  • login
  • password And its own API endpoint /users with only these two fields.

And you would have a user_profiles entity with:

  • user_id
  • name
  • address And its own API endpoint /users/{user_id}/user_profiles with only the name and address fields.

And what if you need to display all of these information in your view? Well, you use an operator like we did for the post_with_comments_counts. Named, for instance: user_with_profiles, and listening to users and user_profiles events.

This way, you keep your data and API consistent.

use params

When you use this module, you will be able to specify these parameters:

  • :schema: The name of the entity as a string (mandatory field)
  • :repo: The repository used for the connection to the database (mandatory field)
  • :sql_adapter: The SQL adapter to use (Defaults to Ecto.Adapters.SQL)
  • :id: The field used as an ID for the schema (Defaults to :id)

Summary

Types

The event received by the operator

Functions

Macro used as first parameter to on_event/3 to listen to all kinds of events

Sets the entities that must delete the final entity when deleted.

Macro used as second parameter to on_event/3 to listen to a list of entities. You have to use the name of the table in string.

Macro used as second parameter to on_event/3 to listen to only one entity. You have to use the name of the table in string.

Macro used as first parameter to on_event/3 to listen to only one event type from these items

Macro used as first parameter to on_event/3 to listen to a list of event types from these items

Sets the mapping of ids.

Specifies that no entity should delete the final entity when deleted.

This macro is the one listening the events.

Macro used as third parameter to on_event/3 to directly perform a query.

Types

@type event() :: %WalEx.Event{
  changes: map() | nil,
  lsn: {integer(), integer()},
  name: atom(),
  new_record: map() | nil,
  old_record: map() | nil,
  source: %WalEx.Event.Source{
    columns: map(),
    db: String.t(),
    name: term(),
    schema: String.t(),
    table: String.t(),
    version: String.t()
  },
  timestamp: DateTime.t(),
  type: :insert | :update | :delete
}

The event received by the operator

I am using the WalEx package internally

  • name: The name of the entity (ie: :comments for a table named comments)
  • type: The type of event between insert, update and delete
  • source: Information about the event:
    • name: "WalEx"
    • version: Current version of WalEx
    • db: The name of the database
    • schema: Mostly "public"
    • table: The name of the table (ie: "comments")
    • columns: A map of fields with their type (ie: %{"id": "integer", "message": "varchar"})
  • new_record: The entity itself for insert and update events. nil for delete events.
  • old_record: The entity itself for delete events. nil for insert and update events.
  • changes: A map with the changes in the entity in update events, nil otherwise (see below)
  • timestamp: The timstamp of the event in DateTime type
  • lsn: A tuple containing information about the publication cursor

changes

When you receive an update event, you will also have the changes field set to a map containing the changes the entity received since the update.

This map contains the changed fields as keys, and a map describing the change as value. This "change" map contains 2 fields:

  • old_value: The value before the update
  • new_value: The value after the update

For instance, let's say you have a comments entity with 4 fields: id, message, rating, timestamp.

You have a comment with these values:

%Comment{
  id: "dd4bc2ba-c7cc-4a05-a1c7-9f26cd9ab79f",
  message: "This is my first comment",
  rating: 4,
  timestamp: "2024-07-23T12:00:00Z"
}

Now, the comment is update this these new values:

%Comment{
  id: "dd4bc2ba-c7cc-4a05-a1c7-9f26cd9ab79f",
  message: "This is (not) my first comment (anymore)",
  rating: 5,
  timestamp: "2024-07-23T12:00:00Z"
}

The event structure will look like this:

%WalEx.Event{
  name: :comments,
  type: :update,
  source: %WalEx.Event.Source{
    name: "WalEx",
    version: "4.4.0",
    db: "postgresql",
    schema: "public",
    table: "comments",
    columns: %{
      id: "uuid",
      message: "varchar",
      rating: "integer",
      timestamp: "datetime"
    }
  },
  new_record: %Comment{
    id: "dd4bc2ba-c7cc-4a05-a1c7-9f26cd9ab79f",
    message: "This is (not) my first comment (anymore)",
    rating: 5,
    timestamp: "2024-07-23T12:00:00Z"
  },
  old_record: nil,
  changes: %{
    message: %{
      old_value: "This is my first comment",
      new_value: "This is (not) my first comment (anymore)"
    },
    rating: %{
      old_value: 4,
      new_value: 5
    }
  },
  timestamp: "2024-08-25T13:13:30Z",
  lsn: {0, 0}
}

Functions

Link to this macro

all_events()

View Source (since 1.0.0) (macro)

Macro used as first parameter to on_event/3 to listen to all kinds of events:

  • :on_insert
  • :on_update
  • :on_delete
Link to this macro

delete_with(schemas)

View Source (since 1.0.0) (macro)

Sets the entities that must delete the final entity when deleted.

It takes a list of entities (as a list of strings).

This macro is used as third argument to the query/3 macro.

Example

delete_with(["posts"])
Link to this macro

entities(entities)

View Source (since 1.0.0) (macro)

Macro used as second parameter to on_event/3 to listen to a list of entities. You have to use the name of the table in string.

Link to this macro

entity(entity)

View Source (since 1.0.0) (macro)

Macro used as second parameter to on_event/3 to listen to only one entity. You have to use the name of the table in string.

Link to this macro

event(event)

View Source (since 1.0.0) (macro)

Macro used as first parameter to on_event/3 to listen to only one event type from these items:

  • :on_insert
  • :on_update
  • :on_delete
Link to this macro

events(events)

View Source (since 1.0.0) (macro)

Macro used as first parameter to on_event/3 to listen to a list of event types from these items:

  • :on_insert
  • :on_update
  • :on_delete
Link to this macro

id(mapping)

View Source (since 1.0.0) (macro)

Sets the mapping of ids.

The mapping is a list of tuples containing:

  • The entity name (as string)
  • The field that corresponds to the final entity's id

This macro is used as second argument to the query/3 macro.

Example

id([{"comments", :post_id}, {"posts", :id}])
Link to this macro

no_delete()

View Source (since 1.0.0) (macro)

Specifies that no entity should delete the final entity when deleted.

This macro is used as third argument to the query/3 macro.

Link to this macro

on_event(events, entities, callback)

View Source (since 1.0.0) (macro)
@spec on_event([:on_insert | :on_update | :on_delete], [binary()], (event() ->
                                                                {:ok, any()}
                                                                | {:error,
                                                                   any()})) ::
  any()

This macro is the one listening the events.

You have to define one and only one in your operator.

Parameters

  • events: A list of event types to listen to (:insert, :update, :delete). You can use the all_events/0, events/1 and event/1 macros to help you.
  • entities: A list of entities to list to (use their table name as string). You can use entities/1 and entity/1 macros to help you.
  • callback: A function that takes the event as parameter and must return a result. You can also use the query/3 macro instead.

Example

on_event(
  all_events(),
  entities(["comments", "posts"]),
  fn event ->
    {:ok, res} =
      event
      |> MyQuery.do_some_queries()
      |> MyQuery.then_another_query()
    {:ok, res}
  end
)
Link to this macro

query(query, ids_mapping, delete_with)

View Source (since 1.0.0) (macro)

Macro used as third parameter to on_event/3 to directly perform a query.

Parameters

  • query: The query to execute for each event
  • ids_mapping: A list of tuples to map the correct entities' field to the id of the final entity. You can use the id/1 macro.
  • delete_with: A list of entities that, when deleted, will also delete the final entity. You can use the delete_with/1 and no_delete/0 macros.

Example

on_event(
  all_events(),
  entities(["comments", "posts"]),
  query(
    "MERGE INTO post_with_comments_counts dest
    USING (
        SELECT
            p.id AS id,
            p.title AS title,
            p.body AS body,
            p.inserted_at AS inserted_at,
            p.updated_at AS updated_at,
            (SELECT COUNT(c.id) FROM comments c WHERE c.post_id = p.id) AS comments_count
        FROM posts p
        WHERE p.id = $1
    ) src ON src.id = dest.id
    WHEN NOT MATCHED THEN
      INSERT (id, title, body, inserted_at, updated_at, comments_count)
      VALUES (src.id, src.title, src.body, src.inserted_at, src.updated_at, src.comments_count)
    WHEN MATCHED THEN
      UPDATE SET
          title = src.title,
          body = src.body,
          inserted_at = src.inserted_at,
          updated_at = src.updated_at,
          comments_count = src.comments_count;",
    id([{"comments", :post_id}, {"posts", :id}]),
    delete_with(["posts"])
  )
)

Your query will have a $1 parameter which will correspond to the id of the final entity. This id is taken from the entity who emitted the event, based on the ids_mapping field.

For instance, for a comments event, the final entity will use the post_id field as his own id as set in the ids_mapping field.

This mapping is also used in the deletion of the entity.