View Source EctoCursorBasedStream behaviour (ecto_cursor_based_stream v1.0.2)

Use this module in any module that uses Ecto.Repo to enrich it with cursor_based_stream/2 function.

Example:

defmodule MyRepo do
  use Ecto.Repo
  use EctoCursorBasedStream
end

MyUser
|> MyRepo.cursor_based_stream(max_rows: 100)
|> Stream.each(...)
|> Stream.run()

Link to this section Summary

Callbacks

Return a lazy enumerable that emits all entries from the data store matching the given query.

Link to this section Types

Link to this type

cursor_based_stream_opts()

View Source
@type cursor_based_stream_opts() :: [
  max_rows: integer(),
  after_cursor: String.t() | integer(),
  cursor_field: atom()
]

Link to this section Callbacks

Link to this callback

cursor_based_stream(t, cursor_based_stream_opts)

View Source
@callback cursor_based_stream(Ecto.Queryable.t(), cursor_based_stream_opts()) :: Enum.t()

Return a lazy enumerable that emits all entries from the data store matching the given query.

In contrast to Ecto.Repo.stream/2, this will not use database mechanisms (e.g. database transactions) to stream the rows.

It does so by sorting all the rows by options[:cursor_field] and iterating over them in chunks of size options[:max_rows].

options

Options

  • :cursor_field - the field by which all rows should be iterated.

    This field must have unique values. (Otherwise, some rows may get skipped.)

    For performance reasons, we recommend that you have an index on that field. Defaults to :id.

  • :after_cursor - the value of the :cursor_field must be greater than it.

    Useful when you want to continue streaming from a certain point. Any rows with value equal or smaller than this value will not be included.

    Defaults to nil. (All rows will be included.)

  • :max_rows - The number of rows to load from the database as we stream.

    Defaults to 500.

example

Example

MyUser
|> MyRepo.cursor_based_stream(max_rows: 100)
|> Stream.each(...)
|> Stream.run()