View Source EctoCursorBasedStream behaviour (ecto_cursor_based_stream v1.2.0)
Use this module in any module that uses Ecto.Repo
to enrich it with cursor_based_stream/2
function.
Example:
defmodule MyRepo do
use Ecto.Repo
use EctoCursorBasedStream
end
MyUser
|> MyRepo.cursor_based_stream(max_rows: 100)
|> Stream.each(...)
|> Stream.run()
Summary
Callbacks
Return a lazy enumerable that emits all entries from the data store matching the given query.
Types
@type cursor_based_stream_opts() :: [ max_rows: non_neg_integer(), after_cursor: term() | %{required(atom()) => term()}, cursor_field: atom() | [atom()], order: :asc | :desc, parallel: boolean(), prefix: String.t(), timeout: non_neg_integer(), log: false | Logger.level(), telemetry_event: term(), telemetry_options: term() ]
Callbacks
@callback cursor_based_stream(Ecto.Queryable.t(), cursor_based_stream_opts()) :: Enum.t()
Return a lazy enumerable that emits all entries from the data store matching the given query.
In contrast to Ecto.Repo.stream/2
,
this will not use database mechanisms (e.g. database transactions) to stream the rows.
It does so by sorting all the rows by :cursor_field
and iterating over them in chunks
of size :max_rows
.
Options
:cursor_field
- the field or list of fields by which all rows should be iterated.This field must have unique values. (Otherwise, some rows may get skipped.)
For performance reasons, we recommend that you have an index on that field. Defaults to
:id
.:after_cursor
- the value of the:cursor_field
that results start. When:cursor_field
is a list then:after_cursor
must be a map where keys are cursor fields (not all fields are required).Useful when you want to continue streaming from a certain point. Any rows with value equal or smaller than this value will not be included.
Defaults to
nil
. (All rows will be included.):max_rows
- The number of rows to load from the database as we stream.Defaults to 500.
:order
- Order of results,:asc
or:desc
Defaults to
:asc
.:parallel
- whentrue
fetches next batch of records in parallel to processing the stream.Defaults to
false
as this spawnsTask
s and could cause issues e.g. with Ecto Sandbox in tests.:prefix, :timeout, :log, :telemetry_event, :telemetry_options
- options passed directly toEcto.Repo.all/2
Examples
MyUser
|> MyRepo.cursor_based_stream(max_rows: 1000)
|> Stream.each(...)
|> Stream.run()
# change order, run in parallel
MyUser
|> MyRepo.cursor_based_stream(order: :desc, parallel: true)
|> Stream.each(...)
|> Stream.run()
# change cursor field and set starting cursor
MyUser
|> MyRepo.cursor_based_stream(cursor_field: :email, after_cursor: "foo@bar.com")
|> Stream.each(...)
|> Stream.run()
# with multiple fields
MyUser
|> MyRepo.cursor_based_stream(cursor_field: [:email, :date_of_birth], after_cursor: %{email: "foo@bar.com"})
|> Stream.each(...)
|> Stream.run()
# select custom fields, remember to add cursor_field to select
MyUser
|> select([u], map(u, [:my_id, ...])
|> select_merge([u], ...)
|> MyRepo.cursor_based_stream(cursor_field: :my_id)
|> Stream.each(...)
|> Stream.run()
# pass custom options to Ecto.Repo.all/2
MyUser
|> MyRepo.cursor_based_stream(timeout: 60_000, prefix: "public")
|> Stream.each(...)
|> Stream.run()