WeaviateEx.Batch.Background (WeaviateEx v0.7.4)
View SourceBackground batch processor using Elixir processes.
Provides continuous, asynchronous batch processing similar to Python's daemon thread model, but using Elixir's OTP patterns.
Features
- Automatic flushing based on batch size or time interval
- Concurrent request management with configurable limits
- UUID tracking for reference ordering
- Error tracking and retry support
- Graceful shutdown with final flush
Examples
{:ok, batcher} = Background.start_link(
client: client,
collection: "Article",
batch_size: 100,
concurrent_requests: 2
)
# Add objects asynchronously
for article <- articles do
:ok = Background.add_object(batcher, article)
end
# Get results and stop
results = Background.stop(batcher, flush: true)
Summary
Functions
Add an object to the batch queue.
Add a reference to the batch queue.
Returns a specification to start this module under a supervisor.
Trigger an immediate flush of queued items.
Get current accumulated results.
Get current state (for debugging/testing).
Start a background batch processor.
Stop the background processor.
Types
@type option() :: {:client, WeaviateEx.Client.t()} | {:collection, String.t()} | {:batch_size, pos_integer()} | {:concurrent_requests, pos_integer()} | {:flush_interval, pos_integer()} | {:on_flush, (WeaviateEx.Batch.ErrorTracking.Results.t() -> any())} | {:on_error, (WeaviateEx.Batch.ErrorTracking.ErrorObject.t() -> any())} | {:tenant, String.t()}
Functions
@spec add_object(GenServer.server(), map(), keyword()) :: :ok
Add an object to the batch queue.
Options
:uuid- Explicit UUID for the object:vector- Vector embedding:vectors- Named vectors map
@spec add_reference(GenServer.server(), String.t(), String.t(), String.t(), keyword()) :: :ok
Add a reference to the batch queue.
References are held until both the source and target objects have been successfully processed.
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec flush(GenServer.server()) :: :ok
Trigger an immediate flush of queued items.
@spec get_results(GenServer.server()) :: WeaviateEx.Batch.ErrorTracking.Results.t()
Get current accumulated results.
@spec get_state(GenServer.server()) :: map()
Get current state (for debugging/testing).
@spec start_link([option()]) :: GenServer.on_start()
Start a background batch processor.
Options
:client- WeaviateEx client (required):collection- Collection name (required):batch_size- Objects per batch (default: 100):concurrent_requests- Max concurrent requests (default: 2):flush_interval- Auto-flush interval in ms (default: 1000):on_flush- Callback on each flush completion:on_error- Callback on each error:tenant- Tenant name for multi-tenancy
@spec stop( GenServer.server(), keyword() ) :: WeaviateEx.Batch.ErrorTracking.Results.t() | :ok
Stop the background processor.
Options
:flush- Whether to flush remaining items before stopping (default: true)