db_connection v2.0.0 DBConnection behaviour View Source
A behaviour module for implementing efficient database connection client processes, pools and transactions.
DBConnection
handles callbacks differently to most behaviours. Some
callbacks will be called in the calling process, with the state
copied to and from the calling process. This is useful when the data
for a request is large and means that a calling process can interact
with a socket directly.
A side effect of this is that query handling can be written in a simple blocking fashion, while the connection process itself will remain responsive to OTP messages and can enqueue and cancel queued requests.
If a request or series of requests takes too long to handle in the client process a timeout will trigger and the socket can be cleanly disconnected by the connection process.
If a calling process waits too long to start its request it will timeout and its request will be cancelled. This prevents requests building up when the database can not keep up.
If no requests are received for a period of time the connection will trigger an idle timeout and the database can be pinged to keep the connection alive.
Should the connection be lost, attempts will be made to reconnect with (configurable) exponential random backoff to reconnect. All state is lost when a connection disconnects but the process is reused.
The DBConnection.Query
protocol provide utility functions so that
queries can be prepared or encoded and results decoding without
blocking the connection or pool.
Link to this section Summary
Types
Run or transaction connection reference
Functions
Use DBConnection
to set the behaviour
Creates a supervisor child specification for a pool of connections
Close a prepared query on a database connection and return the result. Raises an exception on error
Close a prepared query on a database connection and return {:ok, result}
on
success or {:error, exception}
on error
Execute a prepared query with a database connection and return the result. Raises an exception on error
Execute a prepared query with a database connection and return
{:ok, query, result}
on success or {:error, exception}
if there was an error
Prepare a query with a database connection and return the prepared query. An exception is raised on error
Prepare a query with a database connection for later execution
Prepare a query and execute it with a database connection and return both the prepared query and result. An exception is raised on error
Prepare a query and execute it with a database connection and return both the
prepared query and the result, {:ok, query, result}
on success or
{:error, exception}
if there was an error
Create a stream that will prepare a query, execute it and stream results using a cursor
Reduces a previously built stream or prepared stream
Rollback a database transaction and release lock on connection
Acquire a lock on a connection and run a series of requests on it
Starts and links to a database connection process
Return the transaction status of a connection
Create a stream that will execute a prepared query and stream results using a cursor
Acquire a lock on a connection and run a series of requests inside a
transaction. The result of the transaction fun is return inside an :ok
tuple: {:ok, result}
Callbacks
Checks in the state to the connection process. Return {:ok, state}
to allow the checkin or {:disconnect, exception, state}
to disconnect
Checkouts the state from the connection process. Return {:ok, state}
to allow the checkout or {:disconnect, exception, state}
to disconnect
Connect to the database. Return {:ok, state}
on success or
{:error, exception}
on failure
Disconnect from the database. Return :ok
Handle the beginning of a transaction
Close a query prepared by handle_prepare/3
with the database. Return
{:ok, result, state}
on success and to continue,
{:error, exception, state}
to return an error and continue, or
{:disconnect, exception, state}
to return an error and disconnect
Handle committing a transaction. Return {:ok, result, state}
on successfully
committing transaction, {status, state}
to notify caller that the
transaction can not commit due to the transaction status status
,
{:error, exception, state}
(deprecated) to error and no longer be inside
transaction, or {:disconnect, exception, state}
to error and disconnect
Deallocate a cursor declared by handle_declare/4
with the database. Return
{:ok, result, state}
on success and to continue,
{:error, exception, state}
to return an error and continue, or
{:disconnect, exception, state}
to return an error and disconnect
Declare a cursor using a query prepared by handle_prepare/3
. Return
{:ok, cursor, state}
to start a cursor for a stream and continue,
{:ok, query, cursor, state}
to return altered query query
and cursor
cursor
for a stream and continue, {:error, exception, state}
to return an
error and continue or {:disconnect, exception, state}
to return an error
and disconnect
Execute a query prepared by handle_prepare/3
. Return
{:ok, result, state}
to return the result result
and continue,
{:ok, query, result, state}
to return altered query query
and result
result
and continue, {:error, exception, state}
to return an error and
continue or {:disconnect, exception, state}
to return an error and
disconnect
Fetch the next result from a cursor declared by handle_declare/4
. Return
{:cont, result, state}
to return the result result
and continue using
cursor, {:halt, result, state}
to return the result result
and close the
cursor, {:error, exception, state}
to return an error and close the
cursor, {:disconnect, exception, state}
to return an error and disconnect
Handle rolling back a transaction. Return {:ok, result, state}
on successfully
rolling back transaction, {status, state}
to notify caller that the
transaction can not rollback due to the transaction status status
,
{:error, exception, state}
(deprecated) to
error and no longer be inside transaction, or
{:disconnect, exception, state}
to error and disconnect
Handle getting the transaction status. Return {:idle, state}
if outside a
transaction, {:transaction, state}
if inside a transaction,
{:error, state}
if inside an aborted transaction, or
{:disconnect, exception, state}
to error and disconnect
Called when the connection has been idle for a period of time. Return
{:ok, state}
to continue or {:disconnect, exception, state}
to
disconnect
Link to this section Types
Link to this section Functions
Use DBConnection
to set the behaviour.
child_spec(module(), opts :: Keyword.t()) :: :supervisor.child_spec()
Creates a supervisor child specification for a pool of connections.
See start_link/2
for options.
Close a prepared query on a database connection and return the result. Raises an exception on error.
See close/3
.
close(conn(), query(), opts :: Keyword.t()) :: {:ok, result()} | {:error, Exception.t()}
Close a prepared query on a database connection and return {:ok, result}
on
success or {:error, exception}
on error.
This function should be used to free resources held by the connection process and/or the database server.
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default:15_000
):log
- A function to log information about a call, either a 1-arity fun,{module, function, args}
withDBConnection.LogEntry.t/0
prepended toargs
ornil
. SeeDBConnection.LogEntry
(default:nil
)
The pool and connection module may support other options. All options
are passed to handle_close/3
.
See prepare/3
.
Execute a prepared query with a database connection and return the result. Raises an exception on error.
See execute/4
Execute a prepared query with a database connection and return
{:ok, query, result}
on success or {:error, exception}
if there was an error.
If the query is not prepared on the connection an attempt may be made to prepare it and then execute again.
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default:15_000
):log
- A function to log information about a call, either a 1-arity fun,{module, function, args}
withDBConnection.LogEntry.t/0
prepended toargs
ornil
. SeeDBConnection.LogEntry
(default:nil
)
The pool and connection module may support other options. All options
are passed to handle_execute/4
.
See prepare/3
.
Prepare a query with a database connection and return the prepared query. An exception is raised on error.
See prepare/3
.
prepare(conn(), query(), opts :: Keyword.t()) :: {:ok, query()} | {:error, Exception.t()}
Prepare a query with a database connection for later execution.
It returns {:ok, query}
on success or {:error, exception}
if there was
an error.
The returned query
can then be passed to execute/4
and/or close/3
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default:15_000
):log
- A function to log information about a call, either a 1-arity fun,{module, function, args}
withDBConnection.LogEntry.t/0
prepended toargs
ornil
. SeeDBConnection.LogEntry
(default:nil
)
The pool and connection module may support other options. All options
are passed to handle_prepare/3
.
Example
DBConnection.transaction(pool, fn conn ->
query = %Query{statement: "SELECT * FROM table"}
query = DBConnection.prepare!(conn, query)
try do
DBConnection.execute!(conn, query, [])
after
DBConnection.close(conn, query)
end
end)
Prepare a query and execute it with a database connection and return both the prepared query and result. An exception is raised on error.
See prepare_execute/4
.
Prepare a query and execute it with a database connection and return both the
prepared query and the result, {:ok, query, result}
on success or
{:error, exception}
if there was an error.
The returned query
can be passed to execute/4
and close/3
.
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default:15_000
):log
- A function to log information about a call, either a 1-arity fun,{module, function, args}
withDBConnection.LogEntry.t/0
prepended toargs
ornil
. SeeDBConnection.LogEntry
(default:nil
)
Example
query = %Query{statement: "SELECT id FROM table WHERE id=$1"}
{:ok, query, result} = DBConnection.prepare_execute(conn, query, [1])
{:ok, result2} = DBConnection.execute(conn, query, [2])
:ok = DBConnection.close(conn, query)
prepare_stream(t(), query(), params(), opts :: Keyword.t()) :: DBConnection.PrepareStream.t()
Create a stream that will prepare a query, execute it and stream results using a cursor.
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default:15_000
):log
- A function to log information about a call, either a 1-arity fun,{module, function, args}
withDBConnection.LogEntry.t/0
prepended toargs
ornil
. SeeDBConnection.LogEntry
(default:nil
)
The pool and connection module may support other options. All options
are passed to handle_prepare/3
, handle_close/3
, handle_declare/4
,
and handle_deallocate/4
.
Example
{:ok, results} = DBConnection.transaction(conn, fn conn ->
query = %Query{statement: "SELECT id FROM table"}
stream = DBConnection.prepare_stream(conn, query, [])
Enum.to_list(stream)
end)
Reduces a previously built stream or prepared stream.
Rollback a database transaction and release lock on connection.
When inside of a transaction/3
call does a non-local return, using a
throw/1
to cause the transaction to enter a failed state and the
transaction/3
call returns {:error, reason}
. If transaction/3
calls are
nested the connection is marked as failed until the outermost transaction call
does the database rollback.
Example
{:error, :oops} = DBConnection.transaction(pool, fun(conn) ->
DBConnection.rollback(conn, :oops)
end)
Acquire a lock on a connection and run a series of requests on it.
The return value of this function is the return value of fun
.
To use the locked connection call the request with the connection
reference passed as the single argument to the fun
. If the
connection disconnects all future calls using that connection
reference will fail.
run/3
and transaction/3
can be nested multiple times but a
transaction/3
call inside another transaction/3
will be treated
the same as run/3
.
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (default:15_000
)
The pool may support other options.
Example
{:ok, res} = DBConnection.run(conn, fn conn ->
DBConnection.execute!(conn, query, [])
end)
start_link(module(), opts :: Keyword.t()) :: GenServer.on_start()
Starts and links to a database connection process.
By default the DBConnection
starts a pool with a single connection.
The size of the pool can be increased with :pool_size
. A separate
pool can be given with the :pool
option.
Options
:backoff_min
- The minimum backoff interval (default:1_000
):backoff_max
- The maximum backoff interval (default:30_000
):backoff_type
- The backoff strategy,:stop
for no backoff and to stop,:exp
for exponential,:rand
for random and:rand_exp
for random exponential (default::rand_exp
):configure
- A function to run before every connect attempt to dynamically configure the options, either a 1-arity fun,{module, function, args}
with options prepended toargs
ornil
where only returned options are passed to connect callback (default:nil
):after_connect
- A function to run on connect usingrun/3
, either a 1-arity fun,{module, function, args}
withDBConnection.t/0
prepended toargs
ornil
(default:nil
):name
- A name to register the started process (see the:name
option inGenServer.start_link/3
):pool
- Chooses the pool to be started:pool_size
- Chooses the size of the pool
Example
{:ok, conn} = DBConnection.start_link(mod, [idle_interval: 5_000])
Return the transaction status of a connection.
The callback implementation should return the transaction status according to the database, and not make assumption based.
This function will raise a DBConnection.ConnectionError
when called inside a
deprecated transaction/3
.
Options
See module documentation. The pool and connection module may support other
options. All options are passed to handle_status/2
.
Example
# outside of the transaction, the status is `:idle`
DBConnection.status(conn) #=> :idle
DBConnection.transaction(conn, fn conn ->
DBConnection.status(conn) #=> :transaction
# run a query that will cause the transaction to rollback, e.g.
# uniqueness constraint violation
DBConnection.execute(conn, bad_query, [])
DBConnection.status(conn) #=> :error
end)
DBConnection.status(conn) #=> :idle
stream(t(), query(), params(), opts :: Keyword.t()) :: DBConnection.Stream.t()
Create a stream that will execute a prepared query and stream results using a cursor.
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default:15_000
):log
- A function to log information about a call, either a 1-arity fun,{module, function, args}
withDBConnection.LogEntry.t/0
prepended toargs
ornil
. SeeDBConnection.LogEntry
(default:nil
)
The pool and connection module may support other options. All options
are passed to handle_declare/4
and handle_deallocate/4
.
Example
DBConnection.transaction(pool, fn conn ->
query = %Query{statement: "SELECT id FROM table"}
query = DBConnection.prepare!(conn, query)
try do
stream = DBConnection.stream(conn, query, [])
Enum.to_list(stream)
after
# Make sure query is closed!
DBConnection.close(conn, query)
end
end)
Acquire a lock on a connection and run a series of requests inside a
transaction. The result of the transaction fun is return inside an :ok
tuple: {:ok, result}
.
To use the locked connection call the request with the connection
reference passed as the single argument to the fun
. If the
connection disconnects all future calls using that connection
reference will fail.
run/3
and transaction/3
can be nested multiple times. If a transaction is
rolled back or a nested transaction fun
raises the transaction is marked as
failed. All calls except run/3
, transaction/3
, rollback/2
, close/3
and
close!/3
will raise an exception inside a failed transaction until the outer
transaction call returns. All transaction/3
calls will return
{:error, :rollback}
if the transaction failed or connection closed and
rollback/2
is not called for that transaction/3
.
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (default:15_000
):log
- A function to log information about begin, commit and rollback calls made as part of the transaction, either a 1-arity fun,{module, function, args}
withDBConnection.LogEntry.t/0
prepended toargs
ornil
. SeeDBConnection.LogEntry
(default:nil
)
The pool and connection module may support other options. All options
are passed to handle_begin/2
, handle_commit/2
and
handle_rollback/2
.
Example
{:ok, res} = DBConnection.transaction(conn, fn conn ->
DBConnection.execute!(conn, query, [])
end)
Link to this section Callbacks
checkin(state :: any()) :: {:ok, new_state :: any()} | {:disconnect, Exception.t(), new_state :: any()}
Checks in the state to the connection process. Return {:ok, state}
to allow the checkin or {:disconnect, exception, state}
to disconnect.
This callback is called when the control of the state is passed back
to the connection process. It should reverse any changes to the
connection state made in checkout/1
.
This callback is called in the connection process.
checkout(state :: any()) :: {:ok, new_state :: any()} | {:disconnect, Exception.t(), new_state :: any()}
Checkouts the state from the connection process. Return {:ok, state}
to allow the checkout or {:disconnect, exception, state}
to disconnect.
This callback is called when the control of the state is passed to
another process. checkin/1
is called with the new state when control
is returned to the connection process.
This callback is called in the connection process.
connect(opts :: Keyword.t()) :: {:ok, state :: any()} | {:error, Exception.t()}
Connect to the database. Return {:ok, state}
on success or
{:error, exception}
on failure.
If an error is returned it will be logged and another connection attempt will be made after a backoff interval.
This callback is called in the connection process.
disconnect(err :: Exception.t(), state :: any()) :: :ok
Disconnect from the database. Return :ok
.
The exception as first argument is the exception from a :disconnect
3-tuple returned by a previous callback.
If the state is controlled by a client and it exits or takes too long
to process a request the state will be last known state. In these
cases the exception will be a DBConnection.ConnectionError
.
This callback is called in the connection process.
Handle the beginning of a transaction.
Return {:ok, result, state}
to continue, {status, state}
to notify caller
that the transaction can not begin due to the transaction status status
,
{:error, exception, state}
(deprecated) to error without beginning the
transaction, or {:disconnect, exception, state}
to error and disconnect.
A callback implementation should only return status
if it
can determine the database’s transaction status without side effect.
This callback is called in the client process.
Close a query prepared by handle_prepare/3
with the database. Return
{:ok, result, state}
on success and to continue,
{:error, exception, state}
to return an error and continue, or
{:disconnect, exception, state}
to return an error and disconnect.
This callback is called in the client process.
Handle committing a transaction. Return {:ok, result, state}
on successfully
committing transaction, {status, state}
to notify caller that the
transaction can not commit due to the transaction status status
,
{:error, exception, state}
(deprecated) to error and no longer be inside
transaction, or {:disconnect, exception, state}
to error and disconnect.
A callback implementation should only return status
if it
can determine the database’s transaction status without side effect.
This callback is called in the client process.
Deallocate a cursor declared by handle_declare/4
with the database. Return
{:ok, result, state}
on success and to continue,
{:error, exception, state}
to return an error and continue, or
{:disconnect, exception, state}
to return an error and disconnect.
This callback is called in the client process.
Declare a cursor using a query prepared by handle_prepare/3
. Return
{:ok, cursor, state}
to start a cursor for a stream and continue,
{:ok, query, cursor, state}
to return altered query query
and cursor
cursor
for a stream and continue, {:error, exception, state}
to return an
error and continue or {:disconnect, exception, state}
to return an error
and disconnect.
This callback is called in the client process.
Execute a query prepared by handle_prepare/3
. Return
{:ok, result, state}
to return the result result
and continue,
{:ok, query, result, state}
to return altered query query
and result
result
and continue, {:error, exception, state}
to return an error and
continue or {:disconnect, exception, state}
to return an error and
disconnect.
This callback is called in the client process.
Fetch the next result from a cursor declared by handle_declare/4
. Return
{:cont, result, state}
to return the result result
and continue using
cursor, {:halt, result, state}
to return the result result
and close the
cursor, {:error, exception, state}
to return an error and close the
cursor, {:disconnect, exception, state}
to return an error and disconnect.
This callback is called in the client process.
Prepare a query with the database. Return {:ok, query, state}
where
query
is a query to pass to execute/4
or close/3
,
{:error, exception, state}
to return an error and continue or
{:disconnect, exception, state}
to return an error and disconnect.
This callback is intended for cases where the state of a connection is needed to prepare a query and/or the query can be saved in the database to call later.
This callback is called in the client process.
Handle rolling back a transaction. Return {:ok, result, state}
on successfully
rolling back transaction, {status, state}
to notify caller that the
transaction can not rollback due to the transaction status status
,
{:error, exception, state}
(deprecated) to
error and no longer be inside transaction, or
{:disconnect, exception, state}
to error and disconnect.
A callback implementation should only return status
if it
can determine the database’ transaction status without side effect.
This callback is called in the client and connection process.
handle_status(opts :: Keyword.t(), state :: any()) :: {:idle | :transaction | :error, new_state :: any()} | {:disconnect, Exception.t(), new_state :: any()}
Handle getting the transaction status. Return {:idle, state}
if outside a
transaction, {:transaction, state}
if inside a transaction,
{:error, state}
if inside an aborted transaction, or
{:disconnect, exception, state}
to error and disconnect.
If the callback returns a :disconnect
tuples then status/2
will return
:error
.
ping(state :: any()) :: {:ok, new_state :: any()} | {:disconnect, Exception.t(), new_state :: any()}
Called when the connection has been idle for a period of time. Return
{:ok, state}
to continue or {:disconnect, exception, state}
to
disconnect.
This callback is called if no callbacks have been called after the
idle timeout and a client process is not using the state. The idle
timeout can be configured by the :idle_interval
option. This function
can be called whether the connection is checked in or checked out.
This callback is called in the connection process.