Adbc.Connection (adbc v0.9.0)

Copy Markdown View Source

Documentation for Adbc.Connection.

Connection are modelled as processes. They require an Adbc.Database to be started.

Summary

Functions

Performs a bulk insert operation.

Same as bulk_insert/3 but raises an exception on error.

Returns a specification to start this module under a supervisor.

Get a binary (bytes) type option of the connection.

Gets the underlying driver of a connection process.

Get a float type option of the connection.

Get metadata about the database/driver.

Get an integer type option of the connection.

Get a hierarchical view of all catalogs, database schemas, tables, and columns.

Get a string type option of the connection.

Get a list of table types in the database.

Prepares the given query.

Runs the given query with params and statement_options.

Runs the given query with params and statement_options.

Same as query/4 but raises an exception on error.

Runs the given query with params and pass the Adbc.StreamResult to the given function.

Set option for the connection.

Starts a connection process.

Types

t()

@type t() :: GenServer.server()

Functions

bulk_insert(conn, columns_or_stream, opts \\ [])

@spec bulk_insert(t(), [Adbc.Column.t()] | Adbc.StreamResult.t(), Keyword.t()) ::
  {:ok, non_neg_integer()} | {:error, Exception.t()}

Performs a bulk insert operation.

This function creates a table (or appends to an existing one) and inserts a list of Adbc.Columns in supported databases. This should be more efficient than using SQL query in supported databases.

Alternatively, you can pass an Adbc.StreamResult.t() (obtained from query_pointer/4) to efficiently insert query results without materializing the data.

Arguments

  • conn - The connection process
  • columns_or_stream - Either a list of Adbc.Column.t() or an Adbc.StreamResult.t()
  • opts - Options for the bulk insert operation

Options

  • :table (required) - The name of the target table for bulk insert

  • :mode (optional) - The ingestion mode. When not specified, the default behavior is driver-dependent but typically behaves like :create. Available modes:

    • :create - Create the table and insert data; error if the table already exists
    • :append - Insert data into existing table; error if the table does not exist or if the schema does not match
    • :replace - Drop the table if it exists, create it, and insert data
    • :create_append - Create the table if it does not exist, otherwise append; error if the table exists but the schema does not match
  • :catalog (optional) - The catalog of the table. Support is driver-dependent. Not supported with :temporary.

  • :schema (optional) - The database schema of the table. Support is driver-dependent. For example, SQLite does not support this option. Not supported with :temporary.

  • :temporary (optional) - If true, create a temporary table. Default is false. Cannot be used with :catalog or :schema.

Examples

columns = [
  Adbc.Column.s64([1, 2, 3], name: "id"),
  Adbc.Column.string(["Alice", "Bob", "Charlie"], name: "name")
]

# Create a new table
Adbc.Connection.bulk_insert(conn, columns, table: "users")
#=> {:ok, 3}

# Append to an existing table
Adbc.Connection.bulk_insert(conn, columns, table: "users", mode: :append)
#=> {:ok, 3}

# Create a temporary table
Adbc.Connection.bulk_insert(conn, columns, table: "temp_users", temporary: true)
#=> {:ok, 3}

# Replace an existing table
Adbc.Connection.bulk_insert(conn, columns, table: "users", mode: :replace)
#=> {:ok, 3}

# Efficiently insert from a query (within query_pointer callback)
# This is most useful for transferring across databases.
# Within the same database, you most likely have custom SQL commands,
# such as COPY, CREATE TEMPORARY TABLE, etc.
Adbc.Connection.query_pointer(source_conn, "SELECT * FROM source_table", fn stream ->
  Adbc.Connection.bulk_insert(dest_conn, stream, table: "dest_table")
end)

bulk_insert!(conn, columns_or_stream, opts \\ [])

@spec bulk_insert!(t(), [Adbc.Column.t()] | Adbc.StreamResult.t(), Keyword.t()) ::
  non_neg_integer()

Same as bulk_insert/3 but raises an exception on error.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

get_binary_option(conn, key)

@spec get_binary_option(pid(), atom() | String.t()) ::
  {:ok, binary()} | {:error, String.t()}

Get a binary (bytes) type option of the connection.

get_driver(conn)

@spec get_driver(t()) :: {:ok, atom() | String.t()} | :error

Gets the underlying driver of a connection process.

Examples

ADBC.Connection.get_driver(conn)
#=> {:ok, :sqlite}

get_float_option(conn, key)

@spec get_float_option(pid(), atom() | String.t()) ::
  {:ok, float()} | {:error, String.t()}

Get a float type option of the connection.

get_info(conn, info_codes \\ [])

@spec get_info(t(), [non_neg_integer()]) ::
  {:ok, Adbc.Result.t()} | {:error, Exception.t()}

Get metadata about the database/driver.

The result is an Arrow dataset with the following schema:

Field NameField TypeNull Constraint
info_nameuint32not null
info_valueINFO_SCHEMA

INFO_SCHEMA is a dense union with members:

Field NameType CodeField Type
string_value0utf8
bool_value1bool
int64_value2int64
int32_bitmask3int32
string_list4list<utf8>
int32_to_int32_list_map5map<int32, list<int32>>

Each metadatum is identified by an integer code. The recognized codes are defined as constants. Codes [0, 10_000) are reserved for ADBC usage. Drivers/vendors will ignore requests for unrecognized codes (the row will be omitted from the result).

get_integer_option(conn, key)

@spec get_integer_option(pid(), atom() | String.t()) ::
  {:ok, integer()} | {:error, String.t()}

Get an integer type option of the connection.

get_objects(conn, depth, opts \\ [])

@spec get_objects(
  t(),
  non_neg_integer(),
  catalog: String.t(),
  db_schema: String.t(),
  table_name: String.t(),
  table_type: [String.t()],
  column_name: String.t()
) :: {:ok, Adbc.Result.t()} | {:error, Exception.t()}

Get a hierarchical view of all catalogs, database schemas, tables, and columns.

The result is an Arrow dataset with the following schema:

Field NameField Type
catalog_nameutf8
catalog_db_schemaslist<DB_SCHEMA_SCHEMA>

DB_SCHEMA_SCHEMA is a Struct with fields:

Field NameField Type
db_schema_nameutf8
db_schema_tableslist<TABLE_SCHEMA>

TABLE_SCHEMA is a Struct with fields:

Field NameField TypeNull Contstraint
table_nameutf8not null
table_typeutf8not null
table_columnslist<COLUMN_SCHEMA>
table_constraintslist<CONSTRAINT_SCHEMA>

COLUMN_SCHEMA is a Struct with fields:

Field NameField TypeNull ContstraintComments
column_nameutf8not null
ordinal_positionint32(1)
remarksutf8(2)
xdbc_data_typeint16(3)
xdbc_type_nameutf8(3)
xdbc_column_sizeint32(3)
xdbc_decimal_digitsint16(3)
xdbc_num_prec_radixint16(3)
xdbc_nullableint16(3)
xdbc_column_defutf8(3)
xdbc_sql_data_typeint16(3)
xdbc_datetime_subint16(3)
xdbc_char_octet_lengthint32(3)
xdbc_is_nullableutf8(3)
xdbc_scope_catalogutf8(3)
xdbc_scope_schemautf8(3)
xdbc_scope_tableutf8(3)
xdbc_is_autoincrementbool(3)
xdbc_is_generatedcolumnbool(3)
  1. The column's ordinal position in the table (starting from 1).
  2. Database-specific description of the column.
  3. Optional value. Should be null if not supported by the driver. xdbc_ values are meant to provide JDBC/ODBC-compatible metadata in an agnostic manner.

CONSTRAINT_SCHEMA is a Struct with fields:

Field NameField TypeNull ContstraintComments
constraint_nameutf8
constraint_typeutf8not null(1)
constraint_column_nameslist<utf8>not null(2)
constraint_column_usagelist<USAGE_SCHEMA>(3)
  1. One of 'CHECK', 'FOREIGN KEY', 'PRIMARY KEY', or 'UNIQUE'.
  2. The columns on the current table that are constrained, in order.
  3. For FOREIGN KEY only, the referenced table and columns.

USAGE_SCHEMA is a Struct with fields:

Field NameField TypeNull Contstraint
fk_catalogutf8
fk_db_schemautf8
fk_tableutf8not null
fk_column_nameutf8not null

get_string_option(conn, key)

@spec get_string_option(pid(), atom() | String.t()) ::
  {:ok, String.t()} | {:error, String.t()}

Get a string type option of the connection.

get_table_types(conn)

@spec get_table_types(t()) :: {:ok, Adbc.Result.t()} | {:error, Exception.t()}

Get a list of table types in the database.

The result is an Arrow dataset with the following schema:

Field NameField TypeNull Contstraint
table_typeutf8not null

prepare(conn, query)

@spec prepare(t(), binary()) :: {:ok, reference()} | {:error, Exception.t()}

Prepares the given query.

py_query(conn, query, params \\ [], statement_options \\ [])

Runs the given query with params and statement_options.

This function requires pythonx to be installed, with the pyarrow package available in the installation.

The return value is an ok-tuple with Pythonx.Object - an instance of pyarrow.Table.

The table object can then be used to efficiently create a polars dataframe:

{:ok, py_table} = Adbc.Connection.py_query(conn, "SELECT * FROM ...", [])

Pythonx.eval(
  """
  import polars
  df = polars.from_arrow(py_table)

  # ...
  """,
  %{"py_table" => py_table}
)

or a pandas dataframe:

{:ok, py_table} = Adbc.Connection.py_query(conn, "SELECT * FROM ...", [])

Pythonx.eval(
  """
  df = py_table.to_pandas()

  # ...
  """,
  %{"py_table" => py_table}
)

query(conn, query, params \\ [], statement_options \\ [])

@spec query(t(), binary() | reference(), [term()], Keyword.t()) ::
  {:ok, Adbc.Result.t()} | {:error, Exception.t()}

Runs the given query with params and statement_options.

It returns an ok-tuple with Adbc.Result or an error-tuple. You often want to call Adbc.Result.materialize/1 or Adbc.Result.to_map/1 on the results to consume it.

query!(conn, query, params \\ [], statement_options \\ [])

@spec query!(t(), binary() | reference(), [term()], Keyword.t()) :: Adbc.Result.t()

Same as query/4 but raises an exception on error.

It returns an Adbc.Result struct. You often want to call Adbc.Result.materialize/1 or Adbc.Result.to_map/1 on the results to consume it.

query_pointer(conn, query, params \\ [], fun, statement_options \\ [])

Runs the given query with params and pass the Adbc.StreamResult to the given function.

The Adbc.StreamResult holds a pointer to a valid ArrowStream through the duration of the function. A Adbc.StreamResult can only be consumed once.

The callback function should accept a single argument of type Adbc.StreamResult.t(). For backwards compatibility, 2-arity functions are still supported but deprecated (a warning will be emitted).

set_option(conn, key, value)

@spec set_option(
  pid(),
  atom() | String.t(),
  atom() | {:binary, iodata()} | String.t() | number()
) :: :ok | {:error, String.t()}

Set option for the connection.

  • If value is an atom or a string, then corresponding string option will be set.
  • If value is a {:binary, iodata()}-tuple, then corresponding binary option will be set.
  • If value is an integer, then corresponding integer option will be set.
  • If value is a float, then corresponding float option will be set.

start_link(opts)

Starts a connection process.

Options

  • :database (required) - the database process to connect to

  • :process_options - the options to be given to the underlying process. See GenServer.start_link/3 for all options

Examples

Adbc.Connection.start_link(
  database: MyApp.DB,
  process_options: [name: MyApp.Conn]
)

In your supervision tree it would be started like this:

children = [
  {Adbc.Connection,
   database: MyApp.DB,
   process_options: [name: MyApp.Conn]}
]