Documentation for Adbc.Connection.
Connection are modelled as processes. They require
an Adbc.Database to be started.
Summary
Functions
Performs a bulk insert operation.
Same as bulk_insert/3 but raises an exception on error.
Returns a specification to start this module under a supervisor.
Get a binary (bytes) type option of the connection.
Gets the underlying driver of a connection process.
Get a float type option of the connection.
Get metadata about the database/driver.
Get an integer type option of the connection.
Get a hierarchical view of all catalogs, database schemas, tables, and columns.
Get a string type option of the connection.
Get a list of table types in the database.
Prepares the given query.
Runs the given query with params and statement_options.
Runs the given query with params and statement_options.
Same as query/4 but raises an exception on error.
Runs the given query with params and
pass the Adbc.StreamResult to the given function.
Set option for the connection.
Starts a connection process.
Types
@type t() :: GenServer.server()
Functions
@spec bulk_insert(t(), [Adbc.Column.t()] | Adbc.StreamResult.t(), Keyword.t()) :: {:ok, non_neg_integer()} | {:error, Exception.t()}
Performs a bulk insert operation.
This function creates a table (or appends to an existing one) and inserts a list of
Adbc.Columns in supported databases. This should be more efficient than using SQL
query in supported databases.
Alternatively, you can pass an Adbc.StreamResult.t() (obtained from
query_pointer/4) to efficiently insert query results without materializing the data.
Arguments
conn- The connection processcolumns_or_stream- Either a list ofAdbc.Column.t()or anAdbc.StreamResult.t()opts- Options for the bulk insert operation
Options
:table(required) - The name of the target table for bulk insert:mode(optional) - The ingestion mode. When not specified, the default behavior is driver-dependent but typically behaves like:create. Available modes::create- Create the table and insert data; error if the table already exists:append- Insert data into existing table; error if the table does not exist or if the schema does not match:replace- Drop the table if it exists, create it, and insert data:create_append- Create the table if it does not exist, otherwise append; error if the table exists but the schema does not match
:catalog(optional) - The catalog of the table. Support is driver-dependent. Not supported with:temporary.:schema(optional) - The database schema of the table. Support is driver-dependent. For example, SQLite does not support this option. Not supported with:temporary.:temporary(optional) - Iftrue, create a temporary table. Default isfalse. Cannot be used with:catalogor:schema.
Examples
columns = [
Adbc.Column.s64([1, 2, 3], name: "id"),
Adbc.Column.string(["Alice", "Bob", "Charlie"], name: "name")
]
# Create a new table
Adbc.Connection.bulk_insert(conn, columns, table: "users")
#=> {:ok, 3}
# Append to an existing table
Adbc.Connection.bulk_insert(conn, columns, table: "users", mode: :append)
#=> {:ok, 3}
# Create a temporary table
Adbc.Connection.bulk_insert(conn, columns, table: "temp_users", temporary: true)
#=> {:ok, 3}
# Replace an existing table
Adbc.Connection.bulk_insert(conn, columns, table: "users", mode: :replace)
#=> {:ok, 3}
# Efficiently insert from a query (within query_pointer callback)
# This is most useful for transferring across databases.
# Within the same database, you most likely have custom SQL commands,
# such as COPY, CREATE TEMPORARY TABLE, etc.
Adbc.Connection.query_pointer(source_conn, "SELECT * FROM source_table", fn stream ->
Adbc.Connection.bulk_insert(dest_conn, stream, table: "dest_table")
end)
@spec bulk_insert!(t(), [Adbc.Column.t()] | Adbc.StreamResult.t(), Keyword.t()) :: non_neg_integer()
Same as bulk_insert/3 but raises an exception on error.
Returns a specification to start this module under a supervisor.
See Supervisor.
Get a binary (bytes) type option of the connection.
Gets the underlying driver of a connection process.
Examples
ADBC.Connection.get_driver(conn)
#=> {:ok, :sqlite}
Get a float type option of the connection.
@spec get_info(t(), [non_neg_integer()]) :: {:ok, Adbc.Result.t()} | {:error, Exception.t()}
Get metadata about the database/driver.
The result is an Arrow dataset with the following schema:
| Field Name | Field Type | Null Constraint |
|---|---|---|
info_name | uint32 | not null |
info_value | INFO_SCHEMA |
INFO_SCHEMA is a dense union with members:
| Field Name | Type Code | Field Type |
|---|---|---|
string_value | 0 | utf8 |
bool_value | 1 | bool |
int64_value | 2 | int64 |
int32_bitmask | 3 | int32 |
string_list | 4 | list<utf8> |
int32_to_int32_list_map | 5 | map<int32, list<int32>> |
Each metadatum is identified by an integer code. The recognized codes are defined as constants. Codes [0, 10_000) are reserved for ADBC usage. Drivers/vendors will ignore requests for unrecognized codes (the row will be omitted from the result).
Get an integer type option of the connection.
@spec get_objects( t(), non_neg_integer(), catalog: String.t(), db_schema: String.t(), table_name: String.t(), table_type: [String.t()], column_name: String.t() ) :: {:ok, Adbc.Result.t()} | {:error, Exception.t()}
Get a hierarchical view of all catalogs, database schemas, tables, and columns.
The result is an Arrow dataset with the following schema:
| Field Name | Field Type |
|---|---|
catalog_name | utf8 |
catalog_db_schemas | list<DB_SCHEMA_SCHEMA> |
DB_SCHEMA_SCHEMA is a Struct with fields:
| Field Name | Field Type |
|---|---|
db_schema_name | utf8 |
db_schema_tables | list<TABLE_SCHEMA> |
TABLE_SCHEMA is a Struct with fields:
| Field Name | Field Type | Null Contstraint |
|---|---|---|
table_name | utf8 | not null |
table_type | utf8 | not null |
table_columns | list<COLUMN_SCHEMA> | |
table_constraints | list<CONSTRAINT_SCHEMA> |
COLUMN_SCHEMA is a Struct with fields:
| Field Name | Field Type | Null Contstraint | Comments |
|---|---|---|---|
column_name | utf8 | not null | |
ordinal_position | int32 | (1) | |
remarks | utf8 | (2) | |
xdbc_data_type | int16 | (3) | |
xdbc_type_name | utf8 | (3) | |
xdbc_column_size | int32 | (3) | |
xdbc_decimal_digits | int16 | (3) | |
xdbc_num_prec_radix | int16 | (3) | |
xdbc_nullable | int16 | (3) | |
xdbc_column_def | utf8 | (3) | |
xdbc_sql_data_type | int16 | (3) | |
xdbc_datetime_sub | int16 | (3) | |
xdbc_char_octet_length | int32 | (3) | |
xdbc_is_nullable | utf8 | (3) | |
xdbc_scope_catalog | utf8 | (3) | |
xdbc_scope_schema | utf8 | (3) | |
xdbc_scope_table | utf8 | (3) | |
xdbc_is_autoincrement | bool | (3) | |
xdbc_is_generatedcolumn | bool | (3) |
- The column's ordinal position in the table (starting from 1).
- Database-specific description of the column.
- Optional value. Should be null if not supported by the driver.
xdbc_values are meant to provide JDBC/ODBC-compatible metadata in an agnostic manner.
CONSTRAINT_SCHEMA is a Struct with fields:
| Field Name | Field Type | Null Contstraint | Comments |
|---|---|---|---|
constraint_name | utf8 | ||
constraint_type | utf8 | not null | (1) |
constraint_column_names | list<utf8> | not null | (2) |
constraint_column_usage | list<USAGE_SCHEMA> | (3) |
- One of 'CHECK', 'FOREIGN KEY', 'PRIMARY KEY', or 'UNIQUE'.
- The columns on the current table that are constrained, in order.
- For FOREIGN KEY only, the referenced table and columns.
USAGE_SCHEMA is a Struct with fields:
| Field Name | Field Type | Null Contstraint |
|---|---|---|
fk_catalog | utf8 | |
fk_db_schema | utf8 | |
fk_table | utf8 | not null |
fk_column_name | utf8 | not null |
Get a string type option of the connection.
@spec get_table_types(t()) :: {:ok, Adbc.Result.t()} | {:error, Exception.t()}
Get a list of table types in the database.
The result is an Arrow dataset with the following schema:
| Field Name | Field Type | Null Contstraint |
|---|---|---|
table_type | utf8 | not null |
@spec prepare(t(), binary()) :: {:ok, reference()} | {:error, Exception.t()}
Prepares the given query.
Runs the given query with params and statement_options.
This function requires pythonx to be installed, with the pyarrow
package available in the installation.
The return value is an ok-tuple with Pythonx.Object - an instance
of pyarrow.Table.
The table object can then be used to efficiently create a polars dataframe:
{:ok, py_table} = Adbc.Connection.py_query(conn, "SELECT * FROM ...", [])
Pythonx.eval(
"""
import polars
df = polars.from_arrow(py_table)
# ...
""",
%{"py_table" => py_table}
)or a pandas dataframe:
{:ok, py_table} = Adbc.Connection.py_query(conn, "SELECT * FROM ...", [])
Pythonx.eval(
"""
df = py_table.to_pandas()
# ...
""",
%{"py_table" => py_table}
)
@spec query(t(), binary() | reference(), [term()], Keyword.t()) :: {:ok, Adbc.Result.t()} | {:error, Exception.t()}
Runs the given query with params and statement_options.
It returns an ok-tuple with Adbc.Result or an error-tuple.
You often want to call Adbc.Result.materialize/1 or
Adbc.Result.to_map/1 on the results to consume it.
Same as query/4 but raises an exception on error.
It returns an Adbc.Result struct. You often want to call
Adbc.Result.materialize/1 or Adbc.Result.to_map/1 on the
results to consume it.
Runs the given query with params and
pass the Adbc.StreamResult to the given function.
The Adbc.StreamResult holds a pointer to a valid ArrowStream through
the duration of the function. A Adbc.StreamResult can only be consumed once.
The callback function should accept a single argument of type
Adbc.StreamResult.t(). For backwards compatibility, 2-arity
functions are still supported but deprecated (a warning will be emitted).
@spec set_option( pid(), atom() | String.t(), atom() | {:binary, iodata()} | String.t() | number() ) :: :ok | {:error, String.t()}
Set option for the connection.
- If
valueis an atom or a string, then corresponding string option will be set. - If
valueis a{:binary, iodata()}-tuple, then corresponding binary option will be set. - If
valueis an integer, then corresponding integer option will be set. - If
valueis a float, then corresponding float option will be set.
Starts a connection process.
Options
:database(required) - the database process to connect to:process_options- the options to be given to the underlying process. SeeGenServer.start_link/3for all options
Examples
Adbc.Connection.start_link(
database: MyApp.DB,
process_options: [name: MyApp.Conn]
)In your supervision tree it would be started like this:
children = [
{Adbc.Connection,
database: MyApp.DB,
process_options: [name: MyApp.Conn]}
]