View Source EventStreamex
Package to add event streaming in your Elixir application, using PostgreSQL WAL.
Works very well with Phoenix LiveView to add realtime to your app.
This package uses the Write-Ahead-Log from PostgreSQL to listen for changes in database tables and then, handle them in your application in 2 ways:
- To add realtime support in your views
- To update other entities in realtime (as soon as an entity it depends on is updated)
Full documentation can be found here.
Usage
Installation
The package can be installed by adding event_streamex
to your list of dependencies in mix.exs
:
def deps do
[
{:event_streamex, "~> 1.3.0"}
]
end
Configuration
Here are the configuration parameters for EventStreamex
with their default value:
config :event_streamex,
app_name: nil,
database: "postgres",
durable_slot: true,
error_logger_adapter: {EventStreamex.Operators.Logger.LoggerAdapter, []},
hostname: "localhost",
operator_queue_backoff_multiplicator: 2,
operator_queue_max_restart_time: 10000,
operator_queue_max_retries: 5,
operator_queue_min_restart_time: 500,
password: "postgres",
port: "5432",
process_status_entity_field: :updated_at,
process_status_storage_adapter: {EventStreamex.Operators.ProcessStatus.DbAdapter, []},
publication: "events",
pubsub: [adapter: Phoenix.PubSub, name: nil],
queue_storage_adapter: {EventStreamex.Operators.Queue.DbAdapter, []},
slot_name: "postgres_slot",
url: "",
username: "postgres"
app_name
: Your application namedatabase
: Name of the database to connect todurable_slot
: Is the replication slot for the WAL temporary or durable? If you set it tofalse
, then, you will loose all WAL events sent while your application was not running.error_logger_adapter
: The logger module used with thestart_link
parameter to send ithostname
: Your PostgreSQL hostnameoperator_queue_backoff_multiplicator
: Multiplicator used to compute the new wait time after an operator crash (SeeEventStreamex.Operators.Logger.ErrorLoggerAdapter
)operator_queue_max_restart_time
: Maximum time to wait in milliseconds before the restart of a crashed operator (SeeEventStreamex.Operators.Logger.ErrorLoggerAdapter
)operator_queue_max_retries
: Maximum number of restarts of a crashed operator (SeeEventStreamex.Operators.Logger.ErrorLoggerAdapter
)operator_queue_min_restart_time
: Time to wait in milliseconds before the first restart of a crashed operator (SeeEventStreamex.Operators.Logger.ErrorLoggerAdapter
)password
: Password used to connect to the databaseport
: Port of the databaseprocess_status_entity_field
: The field name used is your entities to represent the last updated time. It must be set also when the entity is created. And it should have a precision of microseconds (use theutc_datetime_usec
type withEcto
). This field is used to update theEventStreamex.Operators.ProcessStatus
registry.process_status_storage_adapter
: Module used for process status storage and the parameter sent to thestart_link
functionpublication
: Name of the publication used by the WAL (it is created automatically if it does not already exist)pubsub
: Adapter to use for pubsub and its namequeue_storage_adapter
: Module used for queue storage and the parameter sent to thestart_link
functionslot_name
: Name of the replication slot for the WAL. It is automatically created if it does not exist.url
: URL to connect to the database. If it is not set, the other parameters will be used insteadusername
: Username to connect to the database
For the package to work correctly you will have to specify, at least:
- The database connection parameters
- The
:app_name
- The
pubsub
:name
Mark the entity for listening
To mark an entity for listening, you will have to do 2 things:
- use the
EventStreamex.Events
module - and set the tables for replication
Use the EventStreamex.Events
module
defmodule MyApp.Blog.Post do
use MyApp.Schema
use EventStreamex.Events, name: :my_app, schema: "posts"
import Ecto.Changeset
alias MyApp.Blog.PostWithCommentsCount
schema "posts" do
field :title, :string
field :body, :string
timestamps(type: :utc_datetime)
end
@doc false
def changeset(post, attrs) do
post
|> cast(attrs, [:id, :title, :body])
|> validate_required([:title, :body])
end
end
Set the tables for replication
For each table of your application, you will have to execute this SQL query (ideally in the migrations);
ALTER TABLE ONLY <table_name> REPLICA IDENTITY FULL;
Of course, you will replace <table_name>
by the name of the table.
Any marked entity will tell PostreSQL's WAL to emit events as soon as it is inserted
, updated
, deleted
.
Listen to events in live views
Now that our posts
entity is subscribed to, in the WAL, we can listen to its changes in a live view:
defmodule MyApp.PostLive.Index do
use MyApp, :live_view
use EventStreamex.EventListener,
schema: "post_with_comments_counts",
subscriptions: [:unscoped]
alias MyApp.Blog
alias MyApp.Blog.Post
@impl true
def mount(params, session, socket) do
{:ok, socket} = super(params, session, socket)
{:ok, stream(socket, :posts, Blog.list_posts())}
end
@impl true
def handle_params(params, url, socket) do
{_res, socket} = super(params, url, socket)
{:noreply, socket}
end
@impl true
def handle_info({:on_insert, [], "post_with_comments_counts", post}, socket) do
{:noreply,
socket
|> stream_insert(:posts, post.new_record)
end
@impl true
def handle_info({:on_update, [], "post_with_comments_counts", post}, socket) do
{:noreply,
socket
|> stream_insert(:posts, post.new_record)
end
@impl true
def handle_info({:on_delete, [], "post_with_comments_counts", post}, socket) do
{:noreply,
socket
|> stream_delete(:posts, post.old_record)
end
end
Here, we listen to posts
events in our live view.
The handle_info/2
callbacks are called with the kind of event received (:on_insert
, :on_update
, :on_delete
) and the event.
You can, then do whatever you need to update your view.
Events structure
An event looks like this:
%WalEx.Event{
name: atom(),
type: :insert | :update | :delete,
source: %WalEx.Event.Source{
name: String.t(),
version: String.t(),
db: String.t(),
schema: String.t(),
table: String.t(),
columns: map()
},
new_record: map() | nil,
old_record: map() | nil,
changes: map() | nil,
timestamp: DateTime.t(),
lsn: {integer(), integer()}
}
I am using the WalEx package internally
name
: The name of the entity (ie::comments
for a table namedcomments
)type
: The type of event betweeninsert
,update
anddelete
source
: Information about the event:name
: "WalEx"version
: Current version ofWalEx
db
: The name of the databaseschema
: Mostly"public"
table
: The name of the table (ie:"comments"
)columns
: A map of fields with their type (ie:%{"id": "integer", "message": "varchar"}
)
new_record
: The entity itself forinsert
andupdate
events.nil
fordelete
events.old_record
: The entity itself fordelete
events.nil
forinsert
andupdate
events.changes
: A map with the changes in the entity inupdate
events,nil
otherwise (see below)timestamp
: The timstamp of the event inDateTime
typelsn
: A tuple containing information about the publication cursor
changes
When you receive an update
event, you will also have the changes
field set to a map containing
the changes the entity received since the update.
This map contains the changed fields as keys, and a map describing the change as value. This "change" map contains 2 fields:
old_value
: The value before the updatenew_value
: The value after the update
For instance, let's say you have a comments
entity with 4 fields: id
, message
, rating
, timestamp
.
You have a comment with these values:
%Comment{
id: "dd4bc2ba-c7cc-4a05-a1c7-9f26cd9ab79f",
message: "This is my first comment",
rating: 4,
timestamp: "2024-07-23T12:00:00Z"
}
Now, the comment is update this these new values:
%Comment{
id: "dd4bc2ba-c7cc-4a05-a1c7-9f26cd9ab79f",
message: "This is (not) my first comment (anymore)",
rating: 5,
timestamp: "2024-07-23T12:00:00Z"
}
The event structure will look like this:
%WalEx.Event{
name: :comments,
type: :update,
source: %WalEx.Event.Source{
name: "WalEx",
version: "4.4.0",
db: "postgresql",
schema: "public",
table: "comments",
columns: %{
id: "uuid",
message: "varchar",
rating: "integer",
timestamp: "datetime"
}
},
new_record: %Comment{
id: "dd4bc2ba-c7cc-4a05-a1c7-9f26cd9ab79f",
message: "This is (not) my first comment (anymore)",
rating: 5,
timestamp: "2024-07-23T12:00:00Z"
},
old_record: nil,
changes: %{
message: %{
old_value: "This is my first comment",
new_value: "This is (not) my first comment (anymore)"
},
rating: %{
old_value: 4,
new_value: 5
}
},
timestamp: "2024-08-25T13:13:30Z",
lsn: {0, 0}
}
Update other entities based on events
Now, let's assume that we also have a comments
entity linked to a post,
and that we want to display our posts with the number of comments they have.
In event streaming we will create another entity (let's say post_with_comments_count
), and update it when we receive events from posts
and comments
.
This entity will look like this:
id
: It's id (same one as the post id)title
: Same one as the post titlebody
: Same one as the post bodycomments_count
: The number of comments of the post
defmodule MyApp.Operators.PostWithCommentsCountOperator do
use EventStreamex.Operators.Operator,
schema: "post_with_comments_counts",
repo: MyApp.Repo
on_event(
all_events(),
entities(["comments", "posts"]),
query(
"""
MERGE INTO post_with_comments_counts dest
USING (
SELECT
p.id AS id,
p.title AS title,
p.body AS body,
p.inserted_at AS inserted_at,
p.updated_at AS updated_at,
(SELECT COUNT(c.id) FROM comments c WHERE c.post_id = p.id) AS comments_count
FROM posts p
WHERE p.id = $1
) src ON src.id = dest.id
WHEN NOT MATCHED THEN
INSERT (id, title, body, inserted_at, updated_at, comments_count)
VALUES (src.id, src.title, src.body, src.inserted_at, src.updated_at, src.comments_count)
WHEN MATCHED THEN
UPDATE SET
title = src.title,
body = src.body,
inserted_at = src.inserted_at,
updated_at = src.updated_at,
comments_count = src.comments_count;
""",
id([{"comments", :post_id}, {"posts", :id}]),
delete_with(["posts"])
)
)
end
Using an operator will allow us to insert/update/delete an entity based on events.
Here, we listen to posts
and comments
and, for each event, we execute the MERGE
query.
This query will create the post_with_comments_count
when a post is created, and update it when the post is updated.
It will also update the comments_count
each time there is a new comment,
or a comment has been deleted.
The $1
parameter in the query will be replaced by the field in the listened entities that corresponds to the id of the post_with_comments_count
.
In this example, it's the post ID. Which is found in the post_id
field in comments
.
That comes from this line: id([{"comments", :post_id}, {"posts", :id}])
.
delete_with(["posts"])
will tell the operator that a post deletion must also delete the post_with_comments_count
.
If you can more complex scenarios, you can also use a function instead of query/3
:
on_event(
all_events(),
entities(["comments", "posts"]),
fn event ->
{:ok, res} =
event
|> MyQuery.do_some_queries()
|> MyQuery.then_another_query()
{:ok, res}
end
)
Why using event streaming?
Although using event streaming with EventStreamex
looks easy, why bother using event streaming in the first place?
The issue without event streaming
Let's say you have a posts
entity which contains:
title
: a titlebody
: a body
You also have a comments
entity which contains:
post_id
: The ID of the post it is attached tocomment
: The comment content
When you show a list of your posts you might want to show the number of comments it contains.
To do so you will be tempted to make a SQL query with a subquery that will count the comments of your posts.
This is pretty trivial:
SELECT
p.id AS id,
p.title AS title,
p.body AS body,
(SELECT COUNT(*) FROM comments c WHERE c.post_id = p.id) AS comments_count
FROM posts p;
But it has several flaws. Let's see them in more details.
Your queries will become slower and slower over time
Because subqueries are not free. They take more time to execute that a simple query with no join and subquery.
So, the more posts and comments you have, the slower the query will be.
You will end up adding complexity in your queries
As your app evolves, you will probably create new entities related to your posts and comments. Thus, you will add more and more subqueries and joins to your queries, making them slower and harder to maintain...
Realtime will become a mess to maintain
If you want to handle realtime events like updating the number of comments when a new one is created it will be a mess to maintain overtime because you will end up subscribing to a lot of events as you link your posts to other entities.
And you will probably forget to subscribe to some of the events making your app inconsistant (some features will be in realtime and other won't be).
Almost impossible to maintain recursivity
If your comments can have subcomments it will be almost impossible to maintain the comments count because of the recursivity it implies.
Depending on entities not directly related to the base entity
Let's say you create another entity named ratings
which saves each rating given to a specific comment.
It would look like this:
comment_id
: The ID of the comment related to this ratingrating
: The given rating
Now imagine you want to display, for every comment, the average rating and the total count of ratings for this comment.
Here again, you could solve this with some subqueries.
But what if you want to show the average rating of all comments in the post so that users can see that a post has received a lot of useful comments.
I don't even want to think about the query that would require...
Benefits from using event streaming
Let's come back to our posts
and comments
.
Now let's imagine that instead of our subquery to count the comments we use
an event streaming architecture and create another entity whose goal is
specifically to show the posts with their comments count.
This post_with_comments_counts
entity will look like that:
title
: a titlebody
: a bodycomments_count
: the number of comments of the post
And this entity will listen to new comments to update its comments count: when a new comment is created it will increase the number of comments of the related post by 1.
With EventStreamex
it would look like this:
defmodule MyApp.Operators.PostWithCommentsCountOperator do
use EventStreamex.Operators.Operator,
schema: "post_with_comments_counts",
repo: MyApp.Repo
on_event(
all_events(),
entities(["comments", "posts"]),
query(
"MERGE INTO post_with_comments_counts dest
USING (
SELECT
p.id AS id,
p.title AS title,
p.body AS body,
p.inserted_at AS inserted_at,
p.updated_at AS updated_at,
(SELECT COUNT(c.id) FROM comments c WHERE c.post_id = p.id) AS comments_count
FROM posts p
WHERE p.id = $1
) src ON src.id = dest.id
WHEN NOT MATCHED THEN
INSERT (id, title, body, inserted_at, updated_at, comments_count)
VALUES (src.id, src.title, src.body, src.inserted_at, src.updated_at, src.comments_count)
WHEN MATCHED THEN
UPDATE SET
title = src.title,
body = src.body,
inserted_at = src.inserted_at,
updated_at = src.updated_at,
comments_count = src.comments_count;",
id([{"comments", :post_id}, {"posts", :id}]),
delete_with(["posts"])
)
)
end
Here, we use the EventStreamex.Operators.Operator
module to listen for
posts
and comments
in all kinds of events (insert, update, delete).
And each time a post or a comment is inserted, updated, deleted, we perform this MERGE
query to create/update our post_with_comments_counts
.
The MERGE
query will insert a new post if the post does not exist and update it if it already exists.
id/1
is used to map the fields of the listened entities to the id of the
post_with_comments_counts
entity (id
for a post and post_id
for a comment).
Finally, delete_with/1
tells the operator which entities deletion should
also delete the related post_with_comments_counts
entity.
As you can see, this MERGE
query also uses a subquery, so why is it better than our first implementation?
This is a naive implementation
The query/3
macro is provided for simplicity.
But you can also replace it by a function to do things as you think it's best (optimize the queries according to the type of event and entity received).
This function will receive the event, allowing you to do whatever you want with it.
The query is executed in the background
In our first example, the query was executed everytime someone wants to display the list of posts.
In our event streaming example, the query is executed in the background each time there is an event with a post or a comment.
To display your list of posts you will just have to do:
SELECT * FROM post_with_comments_counts;
Which is simpler and faster than our former query.
Realtime is easy
You can use the module EventStreamex.EventListener
to easily add realtime
in your live view applications.
Because we are using PostgreSQL WAL events, everytime something happens on an entity (post_with_comments_counts
as well), an event is sent.
This event will be processed by operators listening to it (like our operator here does for posts
and comments
), and it will also be sent to a pubsub processor.
The EventStreamex.EventListener
, now just has to listen to this pubsub processor to notify your live view that the entity your are showing as been updated.
Everything, now, can be realtime, without beeing hard to maintain.
A better pattern to model your data
With this approach you keep your entities consistent and simple. And you create specialized entities for what you want to show in your views.
A post will stay a post with its title
and body
fields and a comment will stay a comment with a comment
field.
post_with_comments_count
is just a specialized entity used for your view.
This keeps your data and APIs consistent and simple with explicit fields and names.
For instance, let's say you have a users
entity with a login
and a password
field.
But later on, you realize that you need to store some information about your users, like its name, its address, and so on.
You will be tempted to update your users
entity to add these new fields:
login
password
address
name
But still, in your application, when comes the moment to create your user, you only need the login
and password
.
So what do you do? You set the other fields as optionals in your API and set them a default value (empty string or null).
And in the view that allows a user to specify more information about him (name
and address
), you will either:
- Use the same API endpoint as for the
users
creation - Or create a new dedicated API endpoint
But both approachs are bad practices!
In the first approach that means that your API expects 2 mandatory fields: login
and password
, and 2 optional fields: name
and address
.
But here, there is no sense in sending the login
and password
.
And maybe you would like the name
and address
fields to be mandatory.
But because your API set these fields as optionals, you will have to check for them from the client side.
Which is a bad practice.
In the second approach, that means that you will create an API endpoint for the user profile.
But this API, will finally update the users
entity.
You end up with hard to understand APIs because their name does not match
to the entity being modified.
A better approach would be to have a users
entity with:
login
password
And its own API endpoint /users
with only these two fields.
And you would have a user_profiles
entity with:
user_id
name
address
And its own API endpoint /users/{user_id}/user_profiles
with only the name
and address
fields.
And what if you need to display all of these information in your view?
Well, you use an operator like we did for the post_with_comments_counts
.
Named, for instance: user_with_profiles
, and listening to users
and user_profiles
events.
This way, you keep your data and API consistent.
How to handle asynchronicity of entity creation
Event streaming is nice, but it makes create/update/delete operations asynchronous for derived entities. Which can be a nightmare to handle correctly.
Let's say you have a LiveView application showing a list of post_with_comments_counts
, which is a derived entity from the posts
entity, containing, in addition to posts
fields, a comments_count
field.
From this view, we can also create a new post.
But we don't create a post_with_comments_counts
directly.
Instead, we must create a posts
entity.
Which, in turn, will trigger the creation of the post_with_comments_counts
using event streaming.
What we want to do is to redirect to the post detail page when the post is created.
But we have a problem!
Because the creation of the posts
entity is synchronous, we can redirect
as soon as it is created.
But that does not mean the post_with_comments_counts
entity will be created already... while it's this entity we need to show.
To handle this scenario and be able to display the post_with_comments_counts
entity as soon as it is created we have 3 solutions:
Wait for the entity to be created before redirecting
To ensure you redirect to an entity that exists, you can actualy wait for it to be created.
In order to do so, you will have to listen to this entity to know when it's been created.
You can use the subscribe_entity/4
function for that matter.
defmodule MyApp.PostLive.Index do
use MyApp, :live_view
use EventStreamex.EventListener,
schema: "post_with_comments_counts",
subscriptions: [:unscoped]
alias MyApp.Blog
alias MyApp.Blog.Post
@impl true
def mount(params, session, socket) do
{:ok, socket} = super(params, session, socket)
{:ok, stream(socket, :posts, Blog.list_posts())}
end
@impl true
def handle_params(params, url, socket) do
{_res, socket} = super(params, url, socket)
{:noreply, apply_action(socket, socket.assigns.live_action, params)}
end
defp apply_action(socket, :new, _params) do
socket
|> assign(:page_title, "New Post")
|> assign(:post, %Post{})
end
defp apply_action(socket, :index, _params) do
socket
|> assign(:page_title, "Listing Posts")
|> assign(:post, nil)
end
@impl true
def handle_info({MyApp.PostLive.FormComponent, {:saved, post}}, socket) do
{:noreply,
socket
|> subscribe_entity("post_with_comments_counts", :direct, %{"id" => post.id})}
end
@impl true
def handle_info({:on_insert, :direct, "post_with_comments_counts", post}, socket) do
{:noreply,
socket
|> unsubscribe_entity("post_with_comments_counts", :direct)
|> push_navigate(to: "/posts/#{post.new_record.id}")}
end
@impl true
def handle_info({_, :direct, "post_with_comments_counts", _post}, socket) do
{:noreply, socket}
end
@impl true
def handle_info({:on_insert, [], "post_with_comments_counts", post}, socket) do
{:noreply,
socket
|> stream_insert(:posts, post.new_record)
end
@impl true
def handle_info({:on_update, [], "post_with_comments_counts", post}, socket) do
{:noreply,
socket
|> stream_insert(:posts, post.new_record)
end
@impl true
def handle_info({:on_delete, [], "post_with_comments_counts", post}, socket) do
{:noreply,
socket
|> stream_delete(:posts, post.old_record)
end
end
Here, our live view listens for :unscoped
post_with_comments_counts
entity for the index
view in realtime (nothing new).
This code is more interesting:
@impl true
def handle_info({MyApp.PostLive.FormComponent, {:saved, post}}, socket) do
{:noreply,
socket
|> subscribe_entity("post_with_comments_counts", :direct, %{"id" => post.id})}
end
This function gets called when a posts
has been created.
And inside it, instead of redirecting immediately, we call the subscribe_entity/4
function to listen for post_with_comments_counts
with a specific ID. The ID of the newly created post.
That means that as soon as this post_with_comments_counts
entity will be created, we will be notified:
@impl true
def handle_info({:on_insert, :direct, "post_with_comments_counts", post}, socket) do
{:noreply,
socket
|> unsubscribe_entity("post_with_comments_counts", :direct)
|> push_navigate(to: "/posts/#{post.new_record.id}")}
end
When the entity is created, we unsubscribe from the channel (this is optional is it will automatically unsubscribe as soon as the live view terminates), and we redirect to the detail page.
And we know, at this moment, the post_with_comments_counts
entity will exist.
Handle entity not found
Another solution is to redirect immediately without waiting for the post_with_comments_counts
to be created, and by handling the case where the entity is not found yet.
defmodule MyApp.PostLive.Show do
use MyApp, :live_view
use EventStreamex.EventListener,
schema: "post_with_comments_counts",
subscriptions: [:direct]
alias MyApp.Blog
@impl true
def mount(params, session, socket) do
super(params, session, socket)
end
@impl true
def handle_params(%{"id" => id} = params, url, socket) do
{_res, socket} = super(params, url, socket)
post = Blog.get_post(id)
{:noreply,
socket
|> assign(:id, id)
|> assign(:post, post)
end
@impl true
def handle_info({:on_insert, :direct, "post_with_comments_counts", post}, socket) do
{:noreply,
socket
|> assign(:post, post.new_record)}
end
@impl true
def handle_info({:on_update, :direct, "post_with_comments_counts", post}, socket) do
{:noreply,
socket
|> assign(:post, post.new_record)}
end
@impl true
def handle_info({:on_delete, :direct, "post_with_comments_counts", _post}, socket) do
{:noreply, socket |> put_flash(:warning, "Ce post vient d'être supprimé")}
end
end
Here we fecth the entity (but without triggering an exception if it does not exist), and we replace it if we receive the :on_insert
event for that entity.
You will just have to ensure your view shows a loader or something until the entity is created.
Handling async with APIs
We saw 2 solutions we could use with a live view. But what if you develop an mobile app and use an API?
If you have an API endpoint to create a new posts
(POST /posts
), and as soon as your appliation calls it, it redirects to another view requesting the detail of the created post_with_comments_counts
(GET /post_with_comments_counts
), then, you may receive a 404 error because of the async creation of the post_with_comments_counts
entity.
In order to avoid that, you could implement a mecanism that would return, along with the posts
newly created, the post creation timestamp.
Then, in the GET /post_with_comments_counts
endpoint, you would send along the timestamp you got from the POST /posts
endpoint.
And in your controller you would check if the entity has been processed before returning the post_with_comments_counts
requested.
You can use EventStreamex.Operators.ProcessStatus.processed?/2
in order to do so.
For instance, when you create a posts
, you receive this header from the API:
X-Event-Streamex-Entity-Timestamp: "posts/1422057007123000"
def create(conn, %{"post" => post_params}) do
with {:ok, %Post{} = post} <- Blog.create_post(post_params) do
conn
|> put_status(:created)
|> put_resp_header("location", ~p"/api/posts/#{post}")
|> put_resp_header(
"x-event-streamex-entity-timestamp",
"posts/#{EventStreamex.Operators.EntityProcessed.to_unix_timestamp!(post.updated_at)}"
)
|> render(:show, post: post)
end
end
(You can use the :process_status_entity_field
config parameter to set the field to use in your entities to know the last updated time. You should return the field's value in your API, and use this value in the processed?/2
function)
You will send this header when you call the GET /post_with_comments_counts
endpoint.
And the controller will immediately check if the entity has been processed:
EventStreamex.Operators.ProcessStatus.processed?("posts", 1422057007123000)
If the entity has not been processed yet, then, you know the post_with_comments_counts
entity has not been created yet (it works the same for updated entities).
So you can immediately return a 425 response for instance, informing the client to retry later.
If the entity has been processed, then you fetch it and return it. If you get a 404 error, this time, that means the entity did not exist.
You can do that in a plug in your api scope:
defmodule MyApp.Plugs.EventStreamexEntityTimestamp do
import Plug.Conn
@header_name "x-event-streamex-entity-timestamp"
def init(default), do: default
def call(%Plug.Conn{} = conn, _default) do
case get_req_header(conn, @header_name) do
[] ->
conn
[entity_timestamp] ->
case decode_entity_timestamp(entity_timestamp) do
{:ok, entity, timestamp} ->
unless EventStreamex.Operators.ProcessStatus.processed?(entity, timestamp) do
conn
|> put_status(425)
|> resp(425, "Entity not processed yet")
|> halt()
else
conn
end
_ ->
conn
end
end
end
def decode_entity_timestamp(entity_timestamp) do
case entity_timestamp |> String.split("/") do
[entity | [timestamp_str | []]] ->
case Integer.parse(timestamp_str, 10) do
{timestamp, _} ->
{:ok, entity, timestamp}
:error ->
{:error, :not_integer}
end
_ ->
{:error, :bad_format}
end
end
end
Of course you can implement it the way you want, but you've got the idea.
Tests
To test the package you can run:
mix test
Some tests will need access to a PostgreSQL database.
Here are the credentials used by the tests:
- Database:
postgres
- Hostname:
locahost
- Password:
""
- Port:
5432
- Username:
postgres