WeaviateEx.Batch.Background (WeaviateEx v0.7.4)

View Source

Background batch processor using Elixir processes.

Provides continuous, asynchronous batch processing similar to Python's daemon thread model, but using Elixir's OTP patterns.

Features

  • Automatic flushing based on batch size or time interval
  • Concurrent request management with configurable limits
  • UUID tracking for reference ordering
  • Error tracking and retry support
  • Graceful shutdown with final flush

Examples

{:ok, batcher} = Background.start_link(
  client: client,
  collection: "Article",
  batch_size: 100,
  concurrent_requests: 2
)

# Add objects asynchronously
for article <- articles do
  :ok = Background.add_object(batcher, article)
end

# Get results and stop
results = Background.stop(batcher, flush: true)

Summary

Functions

Add an object to the batch queue.

Returns a specification to start this module under a supervisor.

Trigger an immediate flush of queued items.

Get current accumulated results.

Get current state (for debugging/testing).

Start a background batch processor.

Stop the background processor.

Types

option()

@type option() ::
  {:client, WeaviateEx.Client.t()}
  | {:collection, String.t()}
  | {:batch_size, pos_integer()}
  | {:concurrent_requests, pos_integer()}
  | {:flush_interval, pos_integer()}
  | {:on_flush, (WeaviateEx.Batch.ErrorTracking.Results.t() -> any())}
  | {:on_error, (WeaviateEx.Batch.ErrorTracking.ErrorObject.t() -> any())}
  | {:tenant, String.t()}

Functions

add_object(server, properties, opts \\ [])

@spec add_object(GenServer.server(), map(), keyword()) :: :ok

Add an object to the batch queue.

Options

  • :uuid - Explicit UUID for the object
  • :vector - Vector embedding
  • :vectors - Named vectors map

add_reference(server, from_uuid, property, to_uuid, opts \\ [])

@spec add_reference(GenServer.server(), String.t(), String.t(), String.t(), keyword()) ::
  :ok

Add a reference to the batch queue.

References are held until both the source and target objects have been successfully processed.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

flush(server)

@spec flush(GenServer.server()) :: :ok

Trigger an immediate flush of queued items.

get_results(server)

Get current accumulated results.

get_state(server)

@spec get_state(GenServer.server()) :: map()

Get current state (for debugging/testing).

start_link(opts)

@spec start_link([option()]) :: GenServer.on_start()

Start a background batch processor.

Options

  • :client - WeaviateEx client (required)
  • :collection - Collection name (required)
  • :batch_size - Objects per batch (default: 100)
  • :concurrent_requests - Max concurrent requests (default: 2)
  • :flush_interval - Auto-flush interval in ms (default: 1000)
  • :on_flush - Callback on each flush completion
  • :on_error - Callback on each error
  • :tenant - Tenant name for multi-tenancy

stop(server, opts \\ [])

Stop the background processor.

Options

  • :flush - Whether to flush remaining items before stopping (default: true)