db_connection v1.1.0 DBConnection behaviour
A behaviour module for implementing efficient database connection client processes, pools and transactions.
DBConnection
handles callbacks differently to most behaviours. Some
callbacks will be called in the calling process, with the state
copied to and from the calling process. This is useful when the data
for a request is large and means that a calling process can interact
with a socket directly.
A side effect of this is that query handling can be written in a simple blocking fashion, while the connection process itself will remain responsive to OTP messages and can enqueue and cancel queued requests.
If a request or series of requests takes too long to handle in the client process a timeout will trigger and the socket can be cleanly disconnected by the connection process.
If a calling process waits too long to start its request it will timeout and its request will be cancelled. This prevents requests building up when the database can not keep up.
If no requests are received for a period of time the connection will trigger an idle timeout and the database can be pinged to keep the connection alive.
Should the connection be lost, attempts will be made to reconnect with (configurable) exponential random backoff to reconnect. All state is lost when a connection disconnects but the process is reused.
The DBConnection.Query
protocol provide utility functions so that
queries can be prepared or encoded and results decoding without
blocking the connection or pool.
By default the DBConnection
provides a single connection. However
the :pool
option can be set to use a pool of connections. If a
pool is used the module must be passed as an option - unless inside a
run/3
or transaction/3
fun and using the run/transaction
connection reference (t
).
Summary
Functions
Create a supervisor child specification for a pool of connections
Close a prepared query on a database connection and return {:ok, result}
on
success or {:error, exception}
on error
Close a prepared query on a database connection and return the result. Raises an exception on error
Ensures the given pool applications have been started
Execute a prepared query with a database connection and return
{:ok, result}
on success or {:error, exception}
if there was an
error
Execute a prepared query with a database connection and return the result. Raises an exception on error
Prepare a query with a database connection for later execution and
returns {:ok, query}
on success or {:error, exception}
if there was
an error
Prepare a query with a database connection and return the prepared query. An exception is raised on error
Prepare a query and execute it with a database connection and return both the
prepared query and the result, {:ok, query, result}
on success or
{:error, exception}
if there was an error
Prepare a query and execute it with a database connection and return both the prepared query and result. An exception is raised on error
Create a stream that will prepare a query, execute it and stream results using a cursor
Rollback a transaction, does not return
Acquire a lock on a connection and run a series of requests on it. The
result of the fun is return inside an :ok
tuple: {:ok, result}
Start and link to a database connection process
Create a stream that will execute a prepared query and stream results using a cursor
Acquire a lock on a connection and run a series of requests inside a
transaction. The result of the transaction fun is return inside an :ok
tuple: {:ok result}
Macros
Use DBConnection
to set the behaviour and include default
no-op implementations for ping/1
and handle_info/2
Callbacks
Checks in the state to the connection process. Return {:ok, state}
to allow the checkin or {:disconnect, exception}
to disconnect
Checkouts the state from the connection process. Return {:ok, state}
to allow the checkout or `{:disconnect, exception} to disconnect
Connect to the database. Return {:ok, state}
on success or
{:error, exception}
on failure
Disconnect from the database. Return :ok
Handle the beginning of a transaction. Return {:ok, result, state}
to
continue, {:error, exception, state}
to abort the transaction and
continue or {:disconnect, exception, state}
to abort the transaction
and disconnect
Close a query prepared by handle_prepare/3
with the database. Return
{:ok, result, state}
on success and to continue,
{:error, exception, state}
to return an error and continue, or
{:disconnect, exception, state}
to return an errior and disconnect
Handle commiting a transaction. Return {:ok, result, state}
on success and
to continue, {:error, exception, state}
to abort the transaction and
continue or {:disconnect, exception, state}
to abort the transaction
and disconnect
Deallocate a cursor declared by handle_declare/4' with the database. Return
{:ok, result, state}on success and to continue,
{:error, exception, state}to return an error and continue, or
{:disconnect, exception, state}` to return an errior and disconnect
Declare a cursor using a query prepared by handle_prepare/3
. Return
{:ok, cursor, state}' to start a cursor for a stream and continue,
{:error, exception, state}to return an error and continue or
{:disconnect, exception, state}` to return an error and disconnect
Execute a query prepared by handle_prepare/3
. Return
{:ok, result, state}
to return the result result
and continue,
{:error, exception, state}
to return an error and continue or
{:disconnect, exception, state}
to return an error and disconnect
Fetch the first result from a cursor declared by handle_declare/4
. Return
{:ok, result, state}
to return the result result
and continue,
{:deallocate, result, state}
to return the result result
and deallocate,
{:error, exception, state}
to return an error and close the cursor,
{:disconnect, exception, state}
to return an error and disconnect
Handle a message received by the connection process when checked in.
Return {:ok, state}
to continue or {:disconnect, exception,
state}
to disconnect
Fetch the next result from a cursor declared by handle_declare/4
. Return
{:ok, result, state}
to return the result result
and continue,
{:deallocate, result, state}
to return the result result
and deallocate,
{:error, exception, state}
to return an error and close the cursor,
{:disconnect, exception, state}
to return an error and disconnect
Handle rolling back a transaction. Return {:ok, result, state}
on success
and to continue, {:error, exception, state}
to abort the transaction
and continue or {:disconnect, exception, state}
to abort the
transaction and disconnect
Called when the connection has been idle for a period of time. Return
{:ok, state}
to continue or {:disconnect, exception}
to
disconnect
Types
Functions
Specs
child_spec(module, opts :: Keyword.t, child_opts :: Keyword.t) :: Supervisor.Spec.spec
Create a supervisor child specification for a pool of connections.
See Supervisor.Spec
for child options (child_opts
).
Specs
close(conn, query, opts :: Keyword.t) ::
{:ok, result} |
{:error, Exception.t}
Close a prepared query on a database connection and return {:ok, result}
on
success or {:error, exception}
on error.
This function should be used to free resources held by the connection process and/or the database server.
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default:15_000
):log
- A function to log information about a call, either a 1-arity fun,{module, function, args}
withDBConnection.LogEntry.t
prepended toargs
ornil
. SeeDBConnection.LogEntry
(default:nil
)
The pool and connection module may support other options. All options
are passed to handle_close/3
.
See prepare/3
.
Close a prepared query on a database connection and return the result. Raises an exception on error.
See close/3
.
Specs
ensure_all_started(opts :: Keyword.t, type :: atom) ::
{:ok, [atom]} |
{:error, atom}
Ensures the given pool applications have been started.
Options
:pool
- TheDBConnection.Pool
module to use, (default:DBConnection.Connection
)
Execute a prepared query with a database connection and return
{:ok, result}
on success or {:error, exception}
if there was an
error.
If the query is not prepared on the connection an attempt may be made to prepare it and then execute again.
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default:15_000
):log
- A function to log information about a call, either a 1-arity fun,{module, function, args}
withDBConnection.LogEntry.t
prepended toargs
ornil
. SeeDBConnection.LogEntry
(default:nil
)
The pool and connection module may support other options. All options
are passed to handle_execute/4
.
See prepare/3
.
Execute a prepared query with a database connection and return the result. Raises an exception on error.
See execute/4
Specs
prepare(conn, query, opts :: Keyword.t) ::
{:ok, query} |
{:error, Exception.t}
Prepare a query with a database connection for later execution and
returns {:ok, query}
on success or {:error, exception}
if there was
an error.
The returned query
can then be passed to execute/3
and/or close/3
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default:15_000
):log
- A function to log information about a call, either a 1-arity fun,{module, function, args}
withDBConnection.LogEntry.t
prepended toargs
ornil
. SeeDBConnection.LogEntry
(default:nil
)
The pool and connection module may support other options. All options
are passed to handle_prepare/3
.
Example
query = %Query{statement: "SELECT id FROM table"}
{:ok, query} = DBConnection.prepare(conn, query)
{:ok, result} = DBConnection.execute(conn, query, [])
:ok = DBConnection.close(conn, query)
Prepare a query with a database connection and return the prepared query. An exception is raised on error.
See prepare/3
.
Specs
Prepare a query and execute it with a database connection and return both the
prepared query and the result, {:ok, query, result}
on success or
{:error, exception}
if there was an error.
The returned query
can be passed to execute/4
and close/3
.
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default:15_000
):log
- A function to log information about a call, either a 1-arity fun,{module, function, args}
withDBConnection.LogEntry.t
prepended toargs
ornil
. SeeDBConnection.LogEntry
(default:nil
)
Example
query = %Query{statement: “SELECT id FROM table WHERE id=$1”} {:ok, query, result} = DBConnection.prepare_execute(conn, query, [1]) {:ok, result2} = DBConnection.execute(conn, query, [2]) :ok = DBConnection.close(conn, query)
Prepare a query and execute it with a database connection and return both the prepared query and result. An exception is raised on error.
See prepare_execute/4
.
Specs
prepare_stream(t, query, params, opts :: Keyword.t) :: DBConnection.PrepareStream.t
Create a stream that will prepare a query, execute it and stream results using a cursor.
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default:15_000
):log
- A function to log information about a call, either a 1-arity fun,{module, function, args}
withDBConnection.LogEntry.t
prepended toargs
ornil
. SeeDBConnection.LogEntry
(default:nil
)
The pool and connection module may support other options. All options
are passed to handle_prepare/3,
handle_close/3, handle_declare/4
,
handle_first/4
, handle_next/4' and
handle_deallocate/4`.
### Example
{:ok, results} = DBConnection.transaction(conn, fn(conn) ->
query = %Query{statement: “SELECT id FROM table”}
stream = DBConnection.prepare_stream(conn, query, [])
Enum.to_list(stream)
end)
Specs
rollback(t, reason :: any) :: no_return
Rollback a transaction, does not return.
Aborts the current transaction fun. If inside transaction/3
bubbles
up to the top level.
Example
{:error, :bar} = DBConnection.transaction(conn, fn(conn) ->
DBConnection.rollback(conn, :bar)
IO.puts "never reaches here!"
end)
Acquire a lock on a connection and run a series of requests on it. The
result of the fun is return inside an :ok
tuple: {:ok, result}
.
To use the locked connection call the request with the connection
reference passed as the single argument to the fun
. If the
connection disconnects all future calls using that connection
reference will fail.
run/3
and transaction/3
can be nested multiple times but a
transaction/3
call inside another transaction/3
will be treated
the same as run/3
.
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (default:15_000
)
The pool may support other options.
Example
{:ok, res} = DBConnection.run(conn, fn(conn) ->
DBConnection.execute!(conn, "SELECT id FROM table", [])
end)
Specs
start_link(module, opts :: Keyword.t) :: GenServer.on_start
Start and link to a database connection process.
Options
:pool
- TheDBConnection.Pool
module to use, (default:DBConnection.Connection
):idle
- The idle strategy,:passive
to avoid checkin when idle and:active
to checkin when idle (default::passive
):idle_timeout
- The idle timeout to ping the database (default:1_000
):backoff_min
- The minimum backoff interval (default:1_000
):backoff_max
- The maximum backoff interval (default:30_000
):backoff_type
- The backoff strategy,:stop
for no backoff and to stop,:exp
for exponential,:rand
for random and:rand_exp
for random exponential (default::rand_exp
):after_connect
- A function to run on connect usingrun/3
, either a 1-arity fun,{module, function, args}
withDBConnection.t
prepended toargs
ornil
(default:nil
):name
- A name to register the started process (see the:name
option inGenServer.start_link/3
).
Example
{:ok, conn} = DBConnection.start_link(mod, [idle_timeout: 5_000])
Specs
stream(t, query, params, opts :: Keyword.t) :: DBConnection.Stream.t
Create a stream that will execute a prepared query and stream results using a cursor.
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default:15_000
):log
- A function to log information about a call, either a 1-arity fun,{module, function, args}
withDBConnection.LogEntry.t
prepended toargs
ornil
. SeeDBConnection.LogEntry
(default:nil
)
The pool and connection module may support other options. All options
are passed to handle_declare/4
, handle_first/4
, handle_next/4 and
handle_deallocate/4`.
### Example
{:ok, results} = DBConnection.transaction(conn, fn(conn) ->
query = %Query{statement: “SELECT id FROM table”}
query = DBConnection.prepare!(conn, query)
stream = DBConnection.stream(conn, query, [])
Enum.to_list(stream)
end)
Specs
Acquire a lock on a connection and run a series of requests inside a
transaction. The result of the transaction fun is return inside an :ok
tuple: {:ok result}
.
To use the locked connection call the request with the connection
reference passed as the single argument to the fun
. If the
connection disconnects all future calls using that connection
reference will fail.
run/3
and transaction/3
can be nested multiple times. If a transaction is
rolled back or a nested transaction fun
raises the transaction is marked as
failed. Any calls inside a failed transaction (except rollback/2
) will raise
until the outer transaction call returns. All running transaction/3
calls
will return {:error, :rollback}
if the transaction failed or connection
closed and rollback/2
is not called for that transaction/3
.
Options
:pool_timeout
- The maximum time to wait for a reply when making a synchronous call to the pool (default:5_000
):queue
- Whether to block waiting in an internal queue for the connection’s state (boolean, default:true
):timeout
- The maximum time that the caller is allowed the to hold the connection’s state (default:15_000
):log
- A function to log information about begin, commit and rollback calls made as part of the transaction, either a 1-arity fun,{module, function, args}
withDBConnection.LogEntry.t
prepended toargs
ornil
. SeeDBConnection.LogEntry
(default:nil
)
The pool and connection module may support other options. All options
are passed to handle_begin/2
, handle_commit/2
and
handle_rollback/2
.
Example
{:ok, res} = DBConnection.transaction(conn, fn(conn) ->
DBConnection.execute!(conn, "SELECT id FROM table", [])
end)
Macros
Use DBConnection
to set the behaviour and include default
no-op implementations for ping/1
and handle_info/2
.
Callbacks
Specs
checkin(state :: any) ::
{:ok, new_state :: any} |
{:disconnect, Exception.t, new_state :: any}
Checks in the state to the connection process. Return {:ok, state}
to allow the checkin or {:disconnect, exception}
to disconnect.
This callback is called when the control of the state is passed back
to the connection process. It should reverse any changes made in
checkout/2
.
This callback is called in the connection process.
Specs
checkout(state :: any) ::
{:ok, new_state :: any} |
{:disconnect, Exception.t, new_state :: any}
Checkouts the state from the connection process. Return {:ok, state}
to allow the checkout or {:disconnect, exception} to disconnect.
This callback is called when the control of the state is passed to
another process.
checkin/1is called with the new state when control
is returned to the connection process.
Messages are discarded, instead of being passed to
handle_info/2`,
when the state is checked out.
This callback is called in the connection process.
Specs
connect(opts :: Keyword.t) ::
{:ok, state :: any} |
{:error, Exception.t}
Connect to the database. Return {:ok, state}
on success or
{:error, exception}
on failure.
If an error is returned it will be logged and another connection attempt will be made after a backoff interval.
This callback is called in the connection process.
Specs
disconnect(err :: Exception.t, state :: any) :: :ok
Disconnect from the database. Return :ok
.
The exception as first argument is the exception from a :disconnect
3-tuple returned by a previous callback.
If the state is controlled by a client and it exits or takes too long
to process a request the state will be last known state. In these
cases the exception will be a DBConnection.ConnectionError
.
This callback is called in the connection process.
Specs
handle_begin(opts :: Keyword.t, state :: any) ::
{:ok, result, new_state :: any} |
{:error | :disconnect, Exception.t, new_state :: any}
Handle the beginning of a transaction. Return {:ok, result, state}
to
continue, {:error, exception, state}
to abort the transaction and
continue or {:disconnect, exception, state}
to abort the transaction
and disconnect.
This callback is called in the client process.
Specs
handle_close(query, opts :: Keyword.t, state :: any) ::
{:ok, result, new_state :: any} |
{:error | :disconnect, Exception.t, new_state :: any}
Close a query prepared by handle_prepare/3
with the database. Return
{:ok, result, state}
on success and to continue,
{:error, exception, state}
to return an error and continue, or
{:disconnect, exception, state}
to return an errior and disconnect.
This callback is called in the client process.
Specs
handle_commit(opts :: Keyword.t, state :: any) ::
{:ok, result, new_state :: any} |
{:error | :disconnect, Exception.t, new_state :: any}
Handle commiting a transaction. Return {:ok, result, state}
on success and
to continue, {:error, exception, state}
to abort the transaction and
continue or {:disconnect, exception, state}
to abort the transaction
and disconnect.
This callback is called in the client process.
Specs
handle_deallocate(query, cursor, opts :: Keyword.t, state :: any) ::
{:ok, result, new_state :: any} |
{:error | :disconnect, Exception.t, new_state :: any}
Deallocate a cursor declared by handle_declare/4' with the database. Return
{:ok, result, state}on success and to continue,
{:error, exception, state}to return an error and continue, or
{:disconnect, exception, state}` to return an errior and disconnect.
This callback is called in the client process.
Specs
handle_declare(query, params, opts :: Keyword.t, state :: any) ::
{:ok, cursor, new_state :: any} |
{:error | :disconnect, Exception.t, new_state :: any}
Declare a cursor using a query prepared by handle_prepare/3
. Return
{:ok, cursor, state}' to start a cursor for a stream and continue,
{:error, exception, state}to return an error and continue or
{:disconnect, exception, state}` to return an error and disconnect.
This callback is called in the client process.
Specs
handle_execute(query, params, opts :: Keyword.t, state :: any) ::
{:ok, result, new_state :: any} |
{:error | :disconnect, Exception.t, new_state :: any}
Execute a query prepared by handle_prepare/3
. Return
{:ok, result, state}
to return the result result
and continue,
{:error, exception, state}
to return an error and continue or
{:disconnect, exception, state}
to return an error and disconnect.
This callback is called in the client process.
Specs
handle_first(query, cursor, opts :: Keyword.t, state :: any) ::
{:ok | :deallocate, result, new_state :: any} |
{:error | :disconnect, Exception.t, new_state :: any}
Fetch the first result from a cursor declared by handle_declare/4
. Return
{:ok, result, state}
to return the result result
and continue,
{:deallocate, result, state}
to return the result result
and deallocate,
{:error, exception, state}
to return an error and close the cursor,
{:disconnect, exception, state}
to return an error and disconnect.
This callback is called in the client process.
Specs
handle_info(msg :: any, state :: any) ::
{:ok, new_state :: any} |
{:disconnect, Exception.t, new_state :: any}
Handle a message received by the connection process when checked in.
Return {:ok, state}
to continue or {:disconnect, exception,
state}
to disconnect.
Messages received by the connection process when checked out will be logged and discared.
This callback is called in the connection process.
Specs
handle_next(query, cursor, opts :: Keyword.t, state :: any) ::
{:ok | :deallocate, result, new_state :: any} |
{:error | :disconnect, Exception.t, new_state :: any}
Fetch the next result from a cursor declared by handle_declare/4
. Return
{:ok, result, state}
to return the result result
and continue,
{:deallocate, result, state}
to return the result result
and deallocate,
{:error, exception, state}
to return an error and close the cursor,
{:disconnect, exception, state}
to return an error and disconnect.
This callback is called in the client process.
Specs
handle_prepare(query, opts :: Keyword.t, state :: any) ::
{:ok, query, new_state :: any} |
{:error | :disconnect, Exception.t, new_state :: any}
Prepare a query with the database. Return {:ok, query, state}
where
query
is a query to pass to execute/4
or close/3
,
{:error, exception, state}
to return an error and continue or
{:disconnect, exception, state}
to return an error and disconnect.
This callback is intended for cases where the state of a connection is needed to prepare a query and/or the query can be saved in the database to call later.
This callback is called in the client process.
Specs
handle_rollback(opts :: Keyword.t, state :: any) ::
{:ok, result, new_state :: any} |
{:error | :disconnect, Exception.t, new_state :: any}
Handle rolling back a transaction. Return {:ok, result, state}
on success
and to continue, {:error, exception, state}
to abort the transaction
and continue or {:disconnect, exception, state}
to abort the
transaction and disconnect.
A transaction will be rolled back if an exception occurs or
rollback/2
is called.
This callback is called in the client process.
Specs
ping(state :: any) ::
{:ok, new_state :: any} |
{:disconnect, Exception.t, new_state :: any}
Called when the connection has been idle for a period of time. Return
{:ok, state}
to continue or {:disconnect, exception}
to
disconnect.
This callback is called if no callbacks have been called after the
idle timeout and a client process is not using the state. The idle
timeout can be configured by the :idle_timeout
option. This function
can be called whether the connection is checked in or checked out.
This callback is called in the connection process.