View Source EctoCursorBasedStream behaviour (ecto_cursor_based_stream v1.2.0)
Use this module in any module that uses Ecto.Repo
to enrich it with cursor_based_stream/2 function.
Example:
defmodule MyRepo do
  use Ecto.Repo
  use EctoCursorBasedStream
end
MyUser
|> MyRepo.cursor_based_stream(max_rows: 100)
|> Stream.each(...)
|> Stream.run()Summary
Callbacks
Return a lazy enumerable that emits all entries from the data store matching the given query.
Types
@type cursor_based_stream_opts() :: [ max_rows: non_neg_integer(), after_cursor: term() | %{required(atom()) => term()}, cursor_field: atom() | [atom()], order: :asc | :desc, parallel: boolean(), prefix: String.t(), timeout: non_neg_integer(), log: false | Logger.level(), telemetry_event: term(), telemetry_options: term() ]
Callbacks
@callback cursor_based_stream(Ecto.Queryable.t(), cursor_based_stream_opts()) :: Enum.t()
Return a lazy enumerable that emits all entries from the data store matching the given query.
In contrast to Ecto.Repo.stream/2,
this will not use database mechanisms (e.g. database transactions) to stream the rows.
It does so by sorting all the rows by :cursor_field and iterating over them in chunks
of size :max_rows.
Options
- :cursor_field- the field or list of fields by which all rows should be iterated.- This field must have unique values. (Otherwise, some rows may get skipped.) - For performance reasons, we recommend that you have an index on that field. Defaults to - :id.
- :after_cursor- the value of the- :cursor_fieldthat results start. When- :cursor_fieldis a list then- :after_cursormust be a map where keys are cursor fields (not all fields are required).- Useful when you want to continue streaming from a certain point. Any rows with value equal or smaller than this value will not be included. - Defaults to - nil. (All rows will be included.)
- :max_rows- The number of rows to load from the database as we stream.- Defaults to 500. 
- :order- Order of results,- :ascor- :desc- Defaults to - :asc.
- :parallel- when- truefetches next batch of records in parallel to processing the stream.- Defaults to - falseas this spawns- Tasks and could cause issues e.g. with Ecto Sandbox in tests.
- :prefix, :timeout, :log, :telemetry_event, :telemetry_options- options passed directly to- Ecto.Repo.all/2
Examples
MyUser
|> MyRepo.cursor_based_stream(max_rows: 1000)
|> Stream.each(...)
|> Stream.run()
# change order, run in parallel
MyUser
|> MyRepo.cursor_based_stream(order: :desc, parallel: true)
|> Stream.each(...)
|> Stream.run()
# change cursor field and set starting cursor
MyUser
|> MyRepo.cursor_based_stream(cursor_field: :email, after_cursor: "foo@bar.com")
|> Stream.each(...)
|> Stream.run()
# with multiple fields
MyUser
|> MyRepo.cursor_based_stream(cursor_field: [:email, :date_of_birth], after_cursor: %{email: "foo@bar.com"})
|> Stream.each(...)
|> Stream.run()
# select custom fields, remember to add cursor_field to select
MyUser
|> select([u], map(u, [:my_id, ...])
|> select_merge([u], ...)
|> MyRepo.cursor_based_stream(cursor_field: :my_id)
|> Stream.each(...)
|> Stream.run()
# pass custom options to Ecto.Repo.all/2
MyUser
|> MyRepo.cursor_based_stream(timeout: 60_000, prefix: "public")
|> Stream.each(...)
|> Stream.run()