View Source Snap.Bulk (Snap v0.9.0)

Supports streaming bulk operations against a Snap.Cluster.

Summary

Functions

Link to this function

perform(stream, cluster, index, opts \\ [])

View Source
@spec perform(
  stream :: Enumerable.t(),
  cluster :: module(),
  index :: String.t(),
  opts :: Keyword.t()
) :: :ok | Snap.Cluster.error() | {:error, Snap.BulkError.t()}

Performs a bulk operation.

Takes an Enumerable of action structs, where each struct is one of:

actions = [
  %Snap.Bulk.Action.Create{_id: 1, doc: %{foo: "bar"}},
  %Snap.Bulk.Action.Create{_id: 2, doc: %{foo: "bar"}},
  %Snap.Bulk.Action.Create{_id: 3, doc: %{foo: "bar"}}
]

actions
|> Snap.Bulk.perform(Cluster, "index")

It chunks the Enumerable into pages, and pauses between pages for Elasticsearch to catch up. Uses Stream under the hood, so you can lazily feed it a stream of actions, such as out of an Ecto.Repo to bulk load documents from an SQL database.

If no errors occur on any page it returns :ok. If any errors occur, on any page, it returns {:error, %Snap.BulkError{}}, containing a list of the errors. It will continue to the end of the stream, even if errors occur.

Options:

  • page_size - defines the size of each page, defaulting to 5000 actions.
  • page_wait - defines wait period between pages in ms, defaulting to 15000ms.
  • max_errors - aborts when the number of errors returned exceedes this count (defaults to nil, which will run to the end)
  • request_opts - defines the options to be used with Snap.Request

Any other options, such as pipeline: "foo" are passed through as query parameters to the Bulk API endpoint.