WeaviateEx.Batch.Concurrent (WeaviateEx v0.7.4)
View SourceConcurrent batch operations for high-throughput data insertion.
Provides parallel batch processing using Task.async_stream to maximize
insertion throughput while managing concurrency limits.
Features
- Configurable concurrency level
- Automatic batch splitting
- Result aggregation
- Optional ordered results
- Graceful partial failure handling
Examples
# Insert many objects concurrently
objects = [
%{class: "Article", properties: %{title: "First"}},
%{class: "Article", properties: %{title: "Second"}}
]
{:ok, result} = Concurrent.insert_many(client, "Article", objects,
max_concurrency: 4,
batch_size: 100
)
IO.puts("Inserted: #{result.successful_count}")
IO.puts("Failed: #{result.failed_count}")
Summary
Functions
Aggregates results from multiple batch operations.
Returns the default options for concurrent batch operations.
Inserts many objects concurrently using parallel batch requests.
Merges custom options with defaults.
Checks if an error is retryable.
Splits objects into batches based on batch_size option.
Functions
@spec aggregate_results( [ok: map(), error: term()], keyword() ) :: WeaviateEx.Batch.Concurrent.Result.t()
Aggregates results from multiple batch operations.
@spec default_options() :: keyword()
Returns the default options for concurrent batch operations.
@spec insert_many(WeaviateEx.Client.t(), String.t(), [map()], keyword()) :: {:ok, WeaviateEx.Batch.Concurrent.Result.t()} | {:error, term()}
Inserts many objects concurrently using parallel batch requests.
Options
:max_concurrency- Number of parallel batch requests (default: 4):batch_size- Objects per request (default: 100):ordered- Maintain insertion order in results (default: false):timeout- Timeout per batch in milliseconds (default: 30_000)
Examples
{:ok, result} = Concurrent.insert_many(client, "Article", objects)
{:ok, result} = Concurrent.insert_many(client, "Article", objects,
max_concurrency: 8,
batch_size: 50
)
Merges custom options with defaults.
Checks if an error is retryable.
Splits objects into batches based on batch_size option.