WeaviateEx.Batch.Concurrent (WeaviateEx v0.7.4)

View Source

Concurrent batch operations for high-throughput data insertion.

Provides parallel batch processing using Task.async_stream to maximize insertion throughput while managing concurrency limits.

Features

  • Configurable concurrency level
  • Automatic batch splitting
  • Result aggregation
  • Optional ordered results
  • Graceful partial failure handling

Examples

# Insert many objects concurrently
objects = [
  %{class: "Article", properties: %{title: "First"}},
  %{class: "Article", properties: %{title: "Second"}}
]

{:ok, result} = Concurrent.insert_many(client, "Article", objects,
  max_concurrency: 4,
  batch_size: 100
)

IO.puts("Inserted: #{result.successful_count}")
IO.puts("Failed: #{result.failed_count}")

Summary

Functions

Aggregates results from multiple batch operations.

Returns the default options for concurrent batch operations.

Inserts many objects concurrently using parallel batch requests.

Merges custom options with defaults.

Checks if an error is retryable.

Splits objects into batches based on batch_size option.

Functions

aggregate_results(batch_results, opts \\ [])

@spec aggregate_results(
  [ok: map(), error: term()],
  keyword()
) :: WeaviateEx.Batch.Concurrent.Result.t()

Aggregates results from multiple batch operations.

default_options()

@spec default_options() :: keyword()

Returns the default options for concurrent batch operations.

insert_many(client, collection, objects, opts \\ [])

@spec insert_many(WeaviateEx.Client.t(), String.t(), [map()], keyword()) ::
  {:ok, WeaviateEx.Batch.Concurrent.Result.t()} | {:error, term()}

Inserts many objects concurrently using parallel batch requests.

Options

  • :max_concurrency - Number of parallel batch requests (default: 4)
  • :batch_size - Objects per request (default: 100)
  • :ordered - Maintain insertion order in results (default: false)
  • :timeout - Timeout per batch in milliseconds (default: 30_000)

Examples

{:ok, result} = Concurrent.insert_many(client, "Article", objects)
{:ok, result} = Concurrent.insert_many(client, "Article", objects,
  max_concurrency: 8,
  batch_size: 50
)

merge_options(opts)

@spec merge_options(keyword()) :: keyword()

Merges custom options with defaults.

retryable_error?(arg1)

@spec retryable_error?(term()) :: boolean()

Checks if an error is retryable.

split_into_batches(objects, opts)

@spec split_into_batches(
  [map()],
  keyword()
) :: [[map()]]

Splits objects into batches based on batch_size option.