Constraints and Upserts
In this guide we will learn how to use constraints and upserts. To showcase those features, we will work on a practical scenario: which is by studying a many to many relationship between posts and tags.
put_assoc vs cast_assoc
Imagine we are building an application that has blog posts and such posts may have many tags. Not only that, a given tag may also belong to many posts. This is a classic scenario where we would use many_to_many
associations. Our migrations would look like:
create table(:posts) do
add :title, :string
add :body, :text
timestamps()
end
create table(:tags) do
add :name, :string
timestamps()
end
create unique_index(:tags, [:name])
create table(:posts_tags, primary_key: false) do
add :post_id, references(:posts)
add :tag_id, references(:tags)
end
Note we added a unique index to the tag name because we don't want to have duplicated tags in our database. It is important to add an index at the database level instead of using a validation since there is always a chance two tags with the same name would be validated and inserted simultaneously, passing the validation and leading to duplicated entries.
Now let's also imagine we want the user to input such tags as a list of words split by comma, such as: "elixir, erlang, ecto". Once this data is received in the server, we will break it apart into multiple tags and associate them to the post, creating any tag that does not yet exist in the database.
While the constraints above sound reasonable, that's exactly what put us in trouble with cast_assoc/3
. The cast_assoc/3
changeset function was designed to receive external parameters and compare them with the associated data in our structs. To do so correctly, Ecto requires tags to be sent as a list of maps. We can see an example of this in Polymorphic associations with many to many. However, here we expect tags to be sent in a string separated by comma.
Furthermore, cast_assoc/3
relies on the primary key field for each tag sent in order to decide if it should be inserted, updated or deleted. Again, because the user is simply passing a string, we don't have the ID information at hand.
When we can't cope with cast_assoc/3
, it is time to use put_assoc/4
. In put_assoc/4
, we give Ecto structs or changesets instead of parameters, giving us the ability to manipulate the data as we want. Let's define the schema and the changeset function for a post which may receive tags as a string:
defmodule MyApp.Post do
use Ecto.Schema
schema "posts" do
field :title
field :body
many_to_many :tags, MyApp.Tag,
join_through: "posts_tags",
on_replace: :delete
timestamps()
end
def changeset(struct, params \\ %{}) do
struct
|> Ecto.Changeset.cast(params, [:title, :body])
|> Ecto.Changeset.put_assoc(:tags, parse_tags(params))
end
defp parse_tags(params) do
(params["tags"] || "")
|> String.split(",")
|> Enum.map(&String.trim/1)
|> Enum.reject(& &1 == "")
|> Enum.map(&get_or_insert_tag/1)
end
defp get_or_insert_tag(name) do
Repo.get_by(MyApp.Tag, name: name) ||
Repo.insert!(%Tag{name: name})
end
end
In the changeset function above, we moved all the handling of tags to a separate function, called parse_tags/1
, which checks for the parameter, breaks each tag apart via String.split/2
, then removes any left over whitespace with String.trim/1
, rejects any empty string and finally checks if the tag exists in the database or not, creating one in case none exists.
The parse_tags/1
function is going to return a list of MyApp.Tag
structs which are then passed to put_assoc/4
. By calling put_assoc/4
, we are telling Ecto those should be the tags associated to the post from now on. In case a previous tag was associated to the post and not given in put_assoc/4
, Ecto will invoke the behaviour defined in the :on_replace
option, which we have set to :delete
. The :delete
behaviour will remove the association between the post and the removed tag from the database.
And that's all we need to use many_to_many
associations with put_assoc/4
. put_assoc/4
is very useful when we want to have more explicit control over our associations and it also works with has_many
, belongs_to
and all others association types.
However, our code is not yet ready for production. Let's see why.
Constraints and race conditions
Remember we added a unique index to the tag :name
column when creating the tags table. We did so to protect us from having duplicate tags in the database.
By adding the unique index and then using get_by
with a insert!
to get or insert a tag, we introduced a potential error in our application. If two posts are submitted at the same time with a similar tag, there is a chance we will check if the tag exists at the same time, leading both submissions to believe there is no such tag in the database. When that happens, only one of the submissions will succeed while the other one will fail. That's a race condition: your code will error from time to time, only when certain conditions are met. And those conditions are time sensitive.
Luckily Ecto gives us a mechanism to handle constraint errors from the database.
Checking for constraint errors
Since our get_or_insert_tag(name)
function fails when a tag already exists in the database, we need to handle such scenarios accordingly. Let's rewrite it taking race conditions into account:
defp get_or_insert_tag(name) do
%Tag{}
|> Ecto.Changeset.change(name: name)
|> Ecto.Changeset.unique_constraint(:name)
|> Repo.insert()
|> case do
{:ok, tag} -> tag
{:error, _} -> Repo.get_by!(MyApp.Tag, name: name)
end
end
Instead of inserting the tag directly, we now build a changeset, which allows us to use the unique_constraint
annotation. Now if the Repo.insert
operation fails because the unique index for :name
is violated, Ecto won't raise, but return an {:error, changeset}
tuple. Therefore, if Repo.insert
succeeds, it is because the tag was saved, otherwise the tag already exists, which we then fetch with Repo.get_by!
.
While the mechanism above fixes the race condition, it is a quite expensive one: we need to perform two queries for every tag that already exists in the database: the (failed) insert and then the repository lookup. Given that's the most common scenario, we may want to rewrite it to the following:
defp get_or_insert_tag(name) do
Repo.get_by(MyApp.Tag, name: name) ||
maybe_insert_tag(name)
end
defp maybe_insert_tag(name) do
%Tag{}
|> Ecto.Changeset.change(name: name)
|> Ecto.Changeset.unique_constraint(:name)
|> Repo.insert
|> case do
{:ok, tag} -> tag
{:error, _} -> Repo.get_by!(MyApp.Tag, name: name)
end
end
The above performs 1 query for every tag that already exists, 2 queries for every new tag and possibly 3 queries in the case of race conditions. While the above would perform slightly better on average, Ecto has a better option in stock.
Upserts
Ecto supports the so-called "upsert" command which is an abbreviation for "update or insert". The idea is that we try to insert a record and in case it conflicts with an existing entry, for example due to a unique index, we can choose how we want the database to act by either raising an error (the default behaviour), ignoring the insert (no error) or by updating the conflicting database entries.
"upsert" in Ecto is done with the :on_conflict
option. Let's rewrite get_or_insert_tag(name)
once more but this time using the :on_conflict
option. Remember that "upsert" is a new feature in PostgreSQL 9.5, so make sure you are up to date.
Your first try in using :on_conflict
may be by setting it to :nothing
, as below:
defp get_or_insert_tag(name) do
Repo.insert!(
%MyApp.Tag{name: name},
on_conflict: :nothing
)
end
While the above won't raise an error in case of conflicts, it also won't update the struct given, so it will return a tag without ID. One solution is to force an update to happen in case of conflicts, even if the update is about setting the tag name to its current name. In such cases, PostgreSQL also requires the :conflict_target
option to be given, which is the column (or a list of columns) we are expecting the conflict to happen:
defp get_or_insert_tag(name) do
Repo.insert!(
%MyApp.Tag{name: name},
on_conflict: [set: [name: name]],
conflict_target: :name
)
end
And that's it! We try to insert a tag with the given name and if such tag already exists, we tell Ecto to update its name to the current value, updating the tag and fetching its id. While the above is certainly a step up from all solutions so far, it still performs one query per tag. If 10 tags are sent, we will perform 10 queries. Can we further improve this?
Upserts and insert_all
Ecto accepts the :on_conflict
option not only in Ecto.Repo.insert/2
but also in the Ecto.Repo.insert_all/3
function. This means we can build one query that attempts to insert all missing tags and then another query that fetches all of them at once. Let's see how our Post
schema will look like after those changes:
defmodule MyApp.Post do
use Ecto.Schema
# We need to import Ecto.Query
import Ecto.Query
# Schema is the same
schema "posts" do
add :title
add :body
many_to_many :tags, MyApp.Tag,
join_through: "posts_tags",
on_replace: :delete
timestamps()
end
# Changeset is the same
def changeset(struct, params \\ %{}) do
struct
|> Ecto.Changeset.cast(params, [:title, :body])
|> Ecto.Changeset.put_assoc(:tags, parse_tags(params))
end
# Parse tags has slightly changed
defp parse_tags(params) do
(params["tags"] || "")
|> String.split(",")
|> Enum.map(&String.trim/1)
|> Enum.reject(& &1 == "")
|> insert_and_get_all()
end
defp insert_and_get_all([]) do
[]
end
defp insert_and_get_all(names) do
timestamp =
NaiveDateTime.utc_now()
|> NaiveDateTime.truncate(:second)
placeholders = %{timestamp: timestamp}
maps =
Enum.map(names, &%{
name: &1,
inserted_at: {:placeholder, :timestamp},
updated_at: {:placeholder, :timestamp}
})
Repo.insert_all(MyApp.Tag, maps, placeholders: placeholders, on_conflict: :nothing)
Repo.all(from t in MyApp.Tag, where: t.name in ^names)
end
end
Instead of getting and inserting each tag individually, the code above works on all tags at once, first by building a list of maps which is given to insert_all
. Then we look up all tags with the given names. Regardless of how many tags are sent, we will perform only 2 queries - unless no tag is sent, in which we return an empty list back promptly. This solution is only possible thanks to the :on_conflict
option, which guarantees insert_all
won't fail in case a unique index is violated, such as from duplicate tag names. Remember, insert_all
won't autogenerate values like timestamps. That's why we define a timestamp placeholder and reuse it across inserted_at
and updated_at
fields.
Finally, keep in mind that we haven't used transactions in any of the examples so far. That decision was deliberate as we relied on the fact that getting or inserting tags is an idempotent operation, i.e. we can repeat it many times for a given input and it will always give us the same result back. Therefore, even if we fail to introduce the post to the database due to a validation error, the user will be free to resubmit the form and we will just attempt to get or insert the same tags once again. The downside of this approach is that tags will be created even if creating the post fails, which means some tags may not have posts associated to them. In case that's not desired, the whole operation could be wrapped in a transaction or modeled with Ecto.Multi
.