View Source Dataloader (dataloader v2.0.2)

Dataloader

Dataloader provides an easy way efficiently load data in batches. It's inspired by https://github.com/facebook/dataloader, although it makes some small API changes to better suit Elixir use cases.

Central to Dataloader is the idea of a source. A single Dataloader struct can have many different sources, which represent different ways to load data.

Here's an example of a data loader using an ecto source, and then loading some organization data.

source = Dataloader.Ecto.new(MyApp.Repo)

# setup the loader
loader = Dataloader.new |> Dataloader.add_source(:db, source)

# load some things
loader =
  loader
  |> Dataloader.load(:db, Organization, 1)
  |> Dataloader.load_many(:db, Organization, [4, 9])

# actually retrieve them
loader = Dataloader.run(loader)

# Now we can get whatever values out we want
organizations = Dataloader.get_many(loader, :db, Organization, [1,4])

This will do a single SQL query to get all organizations by ids 1,4, and 9. You can load multiple batches from multiple sources, and then when run/1 is called batch will be loaded concurrently.

Here we named the source :db within our dataloader. More commonly though if you're using Phoenix you'll want to name it after one of your contexts, and have a different source used for each context. This provides an easy way to enforce data access rules within each context. See the Dataloader.Ecto moduledocs for more details

Options

There are two configuration options:

  • timeout - The maximum timeout to wait for running a source, defaults to 1s more than the maximum timeout of all added sources. Set with care, timeouts should really only be set on sources.
  • get_policy - This configures how the dataloader will behave when fetching data which may have errored when we tried to load it.

These can be set as part of the new/1 call. So, for example, to configure a dataloader that returns nil on error with a 5s timeout:

loader =
  Dataloader.new(
    get_policy: :return_nil_on_error,
    timeout: :timer.seconds(5)
  )

get_policy

There are three implemented behaviours for this:

  • raise_on_error (default)- If successful, calling get/4 or get_many/4 will return the value. If there was an exception when trying to load any of the data, it will raise that exception
  • return_nil_on_error - Behaves similar to raise_on_error but will just return nil instead of raising. It will still log errors
  • tuples - This will return {:ok, value}/{:error, reason} tuples depending on a successful or failed load, allowing for more fine-grained error handling if required

Summary

Functions

This is a helper method to run a set of async tasks in a separate supervision tree which

Create a new Dataloader instance.

This used to be used by both the Dataloader module for running multiple source queries concurrently, and the KV and Ecto sources to actually run separate batch fetches (e.g. for Posts and Users at the same time).

This helper function will call fun on all items asynchronously, returning a map of :ok/:error tuples, keyed off the items. For example

Types

option()

@type option() :: {:timeout, pos_integer()} | {:get_policy, atom()}

source_name()

@type source_name() :: any()

t()

@type t() :: %Dataloader{
  options: [option()],
  sources: %{required(source_name()) => Dataloader.Source.t()}
}

Functions

add_source(loader, name, source)

@spec add_source(t(), source_name(), Dataloader.Source.t()) :: t()

async(fun)

@spec async((-> any())) :: Task.t()

See OpentelemetryProcessPropagator.Task.async/1.

async_safely(mod, fun, args \\ [], opts \\ [])

This is a helper method to run a set of async tasks in a separate supervision tree which:

  1. Is run by a supervisor linked to the main process. This ensures any async tasks will get killed if the main process is killed.
  2. Spawns a separate task which traps exits for running the provided function. This ensures we will always have some output, but are not setting :trap_exit on the main process.

NOTE: The provided fun must accept a Task.Supervisor as its first argument, as this function will prepend the relevant supervisor to args

See run_task/3 for an example of a fun implementation, this will return whatever that returns.

async_stream(items, fun, opts)

@spec async_stream(Enumerable.t(), (term() -> term()), keyword()) :: Enumerable.t()

See OpentelemetryProcessPropagator.Task.async_stream/3.

default_timeout()

get(loader, source, batch_key, item_key)

@spec get(t(), source_name(), any(), any()) :: any()

get_many(loader, source, batch_key, item_keys)

@spec get_many(t(), source_name(), any(), any()) :: [any()] | {:ok, [any()]}

load(loader, source_name, batch_key, val)

@spec load(t(), source_name(), any(), any()) :: t()

load_many(loader, source_name, batch_key, vals)

@spec load_many(t(), source_name(), any(), [any()]) :: t()

new(opts \\ [])

@spec new([option()]) :: t()

Create a new Dataloader instance.

See moduledoc for available options

pending_batches?(loader)

@spec pending_batches?(t()) :: boolean()

pmap(items, fun, opts \\ [])

This function is deprecated. Use async_safely/3 instead.
@spec pmap(list(), fun(), keyword()) :: map()

This used to be used by both the Dataloader module for running multiple source queries concurrently, and the KV and Ecto sources to actually run separate batch fetches (e.g. for Posts and Users at the same time).

The problem was that the behaviour between the sources and the parent Dataloader was actually slightly different. The Dataloader-specific behaviour has been pulled out into run_tasks/4

Please use async_safely/3 instead of this for fetching data from sources

put(loader, source_name, batch_key, item_key, result)

run(dataloader)

@spec run(t()) :: t()

run_tasks(items, fun, opts \\ [])

@spec run_tasks(list(), fun(), keyword()) :: map()

This helper function will call fun on all items asynchronously, returning a map of :ok/:error tuples, keyed off the items. For example:

iex> Dataloader.run_tasks([1,2,3], fn x -> x * x end, [])
%{
  1 => {:ok, 1},
  2 => {:ok, 4},
  3 => {:ok, 9}
}

Similarly, for errors:

iex> Dataloader.run_tasks([1,2,3], fn _x -> Process.sleep(5) end, [timeout: 1])
%{
  1 => {:error, :timeout},
  2 => {:error, :timeout},
  3 => {:error, :timeout}
}

By default, tasks are run asynchronously. To run them synchronously, provide async?: false.