View Source EctoCursorBasedStream behaviour (ecto_cursor_based_stream v1.2.0)

Use this module in any module that uses Ecto.Repo to enrich it with cursor_based_stream/2 function.

Example:

defmodule MyRepo do
  use Ecto.Repo
  use EctoCursorBasedStream
end

MyUser
|> MyRepo.cursor_based_stream(max_rows: 100)
|> Stream.each(...)
|> Stream.run()

Summary

Callbacks

Return a lazy enumerable that emits all entries from the data store matching the given query.

Types

Link to this type

cursor_based_stream_opts()

View Source
@type cursor_based_stream_opts() :: [
  max_rows: non_neg_integer(),
  after_cursor: term() | %{required(atom()) => term()},
  cursor_field: atom() | [atom()],
  order: :asc | :desc,
  parallel: boolean(),
  prefix: String.t(),
  timeout: non_neg_integer(),
  log: false | Logger.level(),
  telemetry_event: term(),
  telemetry_options: term()
]

Callbacks

Link to this callback

cursor_based_stream(t, cursor_based_stream_opts)

View Source
@callback cursor_based_stream(Ecto.Queryable.t(), cursor_based_stream_opts()) :: Enum.t()

Return a lazy enumerable that emits all entries from the data store matching the given query.

In contrast to Ecto.Repo.stream/2, this will not use database mechanisms (e.g. database transactions) to stream the rows.

It does so by sorting all the rows by :cursor_field and iterating over them in chunks of size :max_rows.

Options

  • :cursor_field - the field or list of fields by which all rows should be iterated.

    This field must have unique values. (Otherwise, some rows may get skipped.)

    For performance reasons, we recommend that you have an index on that field. Defaults to :id.

  • :after_cursor - the value of the :cursor_field that results start. When :cursor_field is a list then:after_cursor must be a map where keys are cursor fields (not all fields are required).

    Useful when you want to continue streaming from a certain point. Any rows with value equal or smaller than this value will not be included.

    Defaults to nil. (All rows will be included.)

  • :max_rows - The number of rows to load from the database as we stream.

    Defaults to 500.

  • :order - Order of results, :asc or :desc

    Defaults to :asc.

  • :parallel - when true fetches next batch of records in parallel to processing the stream.

    Defaults to false as this spawns Tasks and could cause issues e.g. with Ecto Sandbox in tests.

  • :prefix, :timeout, :log, :telemetry_event, :telemetry_options - options passed directly to Ecto.Repo.all/2

Examples

MyUser
|> MyRepo.cursor_based_stream(max_rows: 1000)
|> Stream.each(...)
|> Stream.run()

# change order, run in parallel
MyUser
|> MyRepo.cursor_based_stream(order: :desc, parallel: true)
|> Stream.each(...)
|> Stream.run()

# change cursor field and set starting cursor
MyUser
|> MyRepo.cursor_based_stream(cursor_field: :email, after_cursor: "foo@bar.com")
|> Stream.each(...)
|> Stream.run()

# with multiple fields
MyUser
|> MyRepo.cursor_based_stream(cursor_field: [:email, :date_of_birth], after_cursor: %{email: "foo@bar.com"})
|> Stream.each(...)
|> Stream.run()

# select custom fields, remember to add cursor_field to select
MyUser
|> select([u], map(u, [:my_id, ...])
|> select_merge([u], ...)
|> MyRepo.cursor_based_stream(cursor_field: :my_id)
|> Stream.each(...)
|> Stream.run()

# pass custom options to Ecto.Repo.all/2
MyUser
|> MyRepo.cursor_based_stream(timeout: 60_000, prefix: "public")
|> Stream.each(...)
|> Stream.run()