View Source Dataloader.Ecto (dataloader v2.0.2)

Ecto source for Dataloader

This defines a schema and an implementation of the Dataloader.Source protocol for handling Ecto related batching.

A simple Ecto source only needs to know about your application's Repo.

Basic Usage

Querying by primary key (analogous to Ecto.Repo.get/3):

source = Dataloader.Ecto.new(MyApp.Repo)

loader =
  Dataloader.new
  |> Dataloader.add_source(Accounts, source)
  |> Dataloader.load(Accounts, User, 1)
  |> Dataloader.load_many(Accounts, Organization, [4, 9])
  |> Dataloader.run

organizations = Dataloader.get_many(loader, Accounts, Organization, [4,9])

Querying for associations. Here we look up the :users association on all the organizations, and the :organization for a single user.

loader =
  loader
  |> Dataloader.load(Accounts, :organization, user)
  |> Dataloader.load_many(Accounts, :users, organizations)
  |> Dataloader.run

Querying by a column other than the primary key:

loader =
  loader
  |> Dataloader.load(Accounts, {:one, User}, name: "admin")
  |> Dataloader.run

Here we pass a keyword list of length one. It is only possible to query by one column here; for more complex queries, see "filtering" below.

Notice here that we need to also specify the cardinality in the batch_key (:many or :one), which will decide whether to return a list or a single value (or nil). This is because the column may not be a key and there may be multiple matching records. Note also that even if we are returning :many values here from multiple matching records, this is still a call to Dataloader.load/4 rather than Dataloader.load_many/4 because there is only one val specified.

Filtering / Ordering

Dataloader.Ecto.new/2 can receive a 2 arity function that can be used to apply broad ordering and filtering rules, as well as handle parameters

source = Dataloader.Ecto.new(MyApp.Repo, query: &Accounts.query/2)

loader =
  Dataloader.new
  |> Dataloader.add_source(Accounts, source)

When we call Dataloader.load/4 we can pass in a tuple as the batch key with a keyword list of parameters in addition to the queryable or assoc_field

# with a queryable
loader
|> Dataloader.load(Accounts, {User, order: :name}, 1)

# or an association
loader
|> Dataloader.load_many(Accounts, {:users, order: :name}, organizations)

# this is still supported
loader
|> Dataloader.load(Accounts, User, 1)

# as is this
loader
|> Dataloader.load(:accounts, :user, organization)

In all cases the Accounts.query function would be:

def query(User, params) do
  field = params[:order] || :id
  from u in User, order_by: [asc: field(u, ^field)]
end
def query(queryable, _) do
  queryable
end

If we query something that ends up using the User schema, whether directly or via association, the query/2 function will match on the first clause and we can handle the params. If no params are supplied, the params arg defaults to source.default_params which itself defaults to %{}.

default_params is an extremely useful place to store values like the current user:

source = Dataloader.Ecto.new(MyApp.Repo, [
  query: &Accounts.query/2,
  default_params: %{current_user: current_user},
])

loader =
  Dataloader.new
  |> Dataloader.add_source(Accounts, source)
  |> Dataloader.load_many(Accounts, Organization, ids)
  |> Dataloader.run

# the query function
def query(Organization, %{current_user: user}) do
  from o in Organization,
    join: m in assoc(o, :memberships),
    where: m.user_id == ^user.id
end
def query(queryable, _) do
  queryable
end

In our query function we are pattern matching on the current user to make sure that we are only able to lookup data in organizations that the user actually has a membership in. Additional options you specify IE {Organization, %{order: :asc}} are merged into the default.

Custom batch queries

There are cases where you want to run the batch function yourself. To do this we can add a custom run_batch/5 callback to our source.

The run_batch/5 function is executed with the query returned from the query/2 function.

For example, we want to get the post count for a set of users.

First we add a custom run_batch/5 function.

def run_batch(_, query, :post_count, users, repo_opts) do
  user_ids = Enum.map(users, & &1.id)
  default_count = 0

  result =
    query
    |> where([p], p.user_id in ^user_ids)
    |> group_by([p], p.user_id)
    |> select([p], {p.user_id, count("*")})
    |> Repo.all(repo_opts)
    |> Map.new()

  for %{id: id} <- users do
    [Map.get(result, id, default_count)]
  end
end

# Fallback to original run_batch
def run_batch(queryable, query, col, inputs, repo_opts) do
  Dataloader.Ecto.run_batch(Repo, queryable, query, col, inputs, repo_opts)
end

This function is supplied with a list of users, does a query and will return the post count for each of user. If the user id is not found in the resultset, because the user has no posts, we return a post count of 0.

Now we need to call run_batch/5 from dataloader. First we add a few posts to the database.

After that, the custom run_batch/5 function is provided to the Dataloader source. Now, we can load the post count for several users. When the dataloader runs it will call the custom run_batch/5 and we can retrieve the posts counts for each individual user.

[user1, user2] = [%User{id: 1}, %User{id: 2}]

rows = [
  %{user_id: user1.id, title: "foo", published: true},
  %{user_id: user1.id, title: "baz", published: false}
]

_ = Repo.insert_all(Post, rows)

source =
  Dataloader.Ecto.new(
    Repo,
    query: &query/2,
    run_batch: &run_batch/5
  )

loader =
  Dataloader.new()
  |> Dataloader.add_source(Posts, source)

loader =
  loader
  |> Dataloader.load(Posts, {:one, Post}, post_count: user1)
  |> Dataloader.load(Posts, {:one, Post}, post_count: user2)
  |> Dataloader.run()

# Returns 2
Dataloader.get(loader, Posts, {:one, Post}, post_count: user1)
# Returns 0
Dataloader.get(loader, Posts, {:one, Post}, post_count: user2)

Additional params for the query/2 function can be passed to the load functions with a 3-tuple.

For example, to limit the above example to only return published we can add a query function to filter the published posts:

def query(Post, %{published: published}) do
  from p in Post,
  where: p.published == ^published
end

def query(queryable, _) do
  queryable
end

And we can return the published posts with a 3-tuple on the loader:

loader =
loader
|> Dataloader.load(Posts, {:one, Post}, post_count: user1)
|> Dataloader.load(Posts, {:one, Post, %{published: true}}, post_count: user1)
|> Dataloader.run()

# Returns 2
Dataloader.get(loader, Posts, {:one, Post}, post_count: user1)
# Returns 1
Dataloader.get(loader, Posts, {:one, Post, %{published: true}}, post_count: user1)

Summary

Functions

Create an Ecto Dataloader source.

Default implementation for loading a batch. Handles looking up records by column

Types

batch_fun()

@type batch_fun() :: (Ecto.Queryable.t(),
                Ecto.Query.t(),
                any(),
                [any()],
                repo_opts() ->
                  [any()])

opt()

@type opt() ::
  {:query, query_fun()}
  | {:default_params, map()}
  | {:repo_opts, repo_opts()}
  | {:timeout, pos_integer()}
  | {:run_batch, batch_fun()}

query_fun()

@type query_fun() :: (Ecto.Queryable.t(), any() -> Ecto.Queryable.t())

repo_opts()

@type repo_opts() :: Keyword.t()

t()

@type t() :: %Dataloader.Ecto{
  batches: map(),
  default_params: map(),
  options: Keyword.t(),
  query: query_fun(),
  repo: Ecto.Repo.t(),
  repo_opts: repo_opts(),
  results: map(),
  run_batch: batch_fun()
}

Functions

new(repo, opts \\ [])

@spec new(Ecto.Repo.t(), [opt()]) :: t()

Create an Ecto Dataloader source.

This module handles retrieving data from Ecto for dataloader. It requires a valid Ecto Repo. It also accepts a repo_opts: option which is handy for applying options to any calls to Repo functions that this module makes.

For example, you can use this module in a multi-tenant context by using the prefix option:

Dataloader.Ecto.new(MyApp.Repo, repo_opts: [prefix: "tenant"])

run_batch(repo, queryable, query, col, inputs, repo_opts)

@spec run_batch(
  repo :: Ecto.Repo.t(),
  queryable :: Ecto.Queryable.t(),
  query :: Ecto.Query.t(),
  col :: any(),
  inputs :: [any()],
  repo_opts :: repo_opts()
) :: [any()]

Default implementation for loading a batch. Handles looking up records by column