SqlKit.DuckDB.Pool (sql_kit v0.2.0)

View Source

A supervised connection pool for DuckDB.

This module manages a pool of DuckDB connections with proper lifecycle management. The database is opened once and shared across all connections in the pool, and is properly released when the pool terminates.

Usage

Add the pool to your application's supervision tree:

children = [
  {SqlKit.DuckDB.Pool,
    name: MyApp.AnalyticsPool,
    database: "priv/analytics.duckdb",
    pool_size: 4}
]

Then use the pool with SqlKit functions using the pool reference:

pool = SqlKit.DuckDB.Pool.pool(MyApp.AnalyticsPool)
SqlKit.query_all(pool, "SELECT * FROM events", [])

Or get the pool reference from start_link:

{:ok, pool} = SqlKit.DuckDB.Pool.start_link(name: MyPool, database: ":memory:")
SqlKit.query_all(pool, "SELECT * FROM events", [])

Options

  • :name - Required. The name to register the pool under (atom)
  • :database - Required. Path to database file or ":memory:"
  • :pool_size - Number of connections. Default: 4
  • :config - Optional Duckdbex.Config struct for advanced configuration

Architecture

The pool is implemented as a supervision tree:

  • A Supervisor manages the overall lifecycle
  • A Database GenServer holds the database reference and releases it on terminate
  • A NimblePool manages the individual connections

This ensures the database is properly released when the pool stops, avoiding resource leaks.

Why Pool Connections?

Based on DuckDB's concurrency model:

  • Each connection locks during query execution
  • Multiple connections enable parallel query execution
  • Connection reuse is critical - disconnecting loses the in-memory cache
  • Recommended: Pool of ~4 connections for typical workloads

Pool Behavior

  • Lazy initialization: Connections are created on-demand when first checked out, not at pool startup. This avoids startup latency.
  • Prepared statement caching: The pool caches prepared statements per connection. Repeated queries with the same SQL skip the prepare step.
  • Checkout timeout: All checkout operations have a configurable timeout (default: 5000ms). If no connection is available within the timeout, a NimblePool.TimeoutError is raised.

Summary

Functions

Checks out a connection from the pool and executes a function.

Returns a child specification for the pool.

Creates a pool reference struct from a pool name.

Executes a SQL query using prepared statement caching.

Executes a SQL query using prepared statement caching.

Starts the connection pool.

Executes a SQL query and streams results through a callback function.

Like with_stream!/5 but also provides column names to the callback.

Types

t()

@type t() :: %SqlKit.DuckDB.Pool{name: atom(), pid: pid()}

Functions

checkout!(pool, fun, opts \\ [])

@spec checkout!(t() | atom(), (SqlKit.DuckDB.Connection.t() -> result), keyword()) ::
  result
when result: term()

Checks out a connection from the pool and executes a function.

The connection is automatically returned to the pool after the function completes, even if it raises an exception.

Note: For queries, prefer using query!/4 which supports prepared statement caching. Use checkout!/2 for operations that need direct connection access.

Options

  • :timeout - Checkout timeout in milliseconds. Default: 5000

Examples

pool = SqlKit.DuckDB.Pool.pool(MyPool)
SqlKit.DuckDB.Pool.checkout!(pool, fn conn ->
  SqlKit.DuckDB.query!(conn, "SELECT * FROM events", [])
end)

child_spec(init_arg)

Returns a child specification for the pool.

Used by supervisors to start the pool as part of a supervision tree.

pool(name)

@spec pool(atom()) :: t()

Creates a pool reference struct from a pool name.

Use this to get a reference that can be passed to SqlKit functions.

Example

pool = SqlKit.DuckDB.Pool.pool(MyApp.AnalyticsPool)
SqlKit.query_all(pool, "SELECT * FROM events", [])

query(pool, sql, params, opts \\ [])

@spec query(t() | atom(), String.t(), list(), keyword()) ::
  {:ok, {[String.t()], [[term()]]}} | {:error, term()}

Executes a SQL query using prepared statement caching.

Returns {:ok, {columns, rows}} on success, {:error, reason} on failure.

See query!/4 for options.

query!(pool, sql, params, opts \\ [])

@spec query!(t() | atom(), String.t(), list(), keyword()) ::
  {[String.t()], [[term()]]}

Executes a SQL query using prepared statement caching.

Prepared statements are cached per connection in the pool. Repeated queries with the same SQL will reuse the prepared statement, skipping the prepare step.

Options

  • :cache - Whether to use prepared statement caching. Default: true
  • :timeout - Checkout timeout in milliseconds. Default: 5000

Examples

pool = SqlKit.DuckDB.Pool.pool(MyPool)
SqlKit.DuckDB.Pool.query!(pool, "SELECT * FROM events WHERE id = $1", [1])
# => {["id", "name"], [[1, "click"]]}

# With custom timeout
SqlKit.DuckDB.Pool.query!(pool, sql, params, timeout: 10_000)

start_link(opts)

@spec start_link(keyword()) :: {:ok, t()} | {:error, term()}

Starts the connection pool.

Returns {:ok, pool} where pool is a struct that can be passed directly to SqlKit functions.

Options

  • :name - Required. The name to register the pool under
  • :database - Required. Path to database file or ":memory:"
  • :pool_size - Number of connections. Default: 4
  • :config - Optional Duckdbex.Config struct

Note on In-Memory Databases

For in-memory databases, all pool connections share the same database instance. This ensures data created on one connection is visible to others.

with_stream!(pool, sql, params, fun, opts \\ [])

@spec with_stream!(
  t() | atom(),
  String.t(),
  list(),
  (Enumerable.t() -> result),
  keyword()
) :: result
when result: term()

Executes a SQL query and streams results through a callback function.

The connection is held for the duration of the callback, which receives a stream of result chunks. This is useful for processing large result sets without loading everything into memory.

Options

  • :timeout - Checkout timeout in milliseconds. Default: 5000

Examples

pool = SqlKit.DuckDB.Pool.pool(MyPool)

# Process large result set in chunks
SqlKit.DuckDB.Pool.with_stream!(pool, "SELECT * FROM large_table", [], fn stream ->
  stream
  |> Stream.flat_map(& &1)
  |> Enum.reduce(0, fn _row, count -> count + 1 end)
end)
# => 1000000

# Take first 100 rows
SqlKit.DuckDB.Pool.with_stream!(pool, "SELECT * FROM events", [], fn stream ->
  stream
  |> Stream.flat_map(& &1)
  |> Enum.take(100)
end)

Notes

  • The connection is checked out for the entire duration of the callback
  • The callback must fully consume or abandon the stream before returning
  • Hugeint values are automatically converted to Elixir integers

with_stream_and_columns!(pool, sql, params, fun, opts \\ [])

@spec with_stream_and_columns!(
  t() | atom(),
  String.t(),
  list(),
  ({[String.t()], Enumerable.t()} -> result),
  keyword()
) :: result
when result: term()

Like with_stream!/5 but also provides column names to the callback.

The callback receives {columns, stream} where columns is a list of column names and stream is the chunk stream.

Options

  • :timeout - Checkout timeout in milliseconds. Default: 5000

Examples

SqlKit.DuckDB.Pool.with_stream_and_columns!(pool, "SELECT * FROM users", [], fn {cols, stream} ->
  IO.inspect(cols)  # => ["id", "name", "age"]
  stream |> Stream.flat_map(& &1) |> Enum.to_list()
end)