db_connection v1.1.0 DBConnection behaviour

A behaviour module for implementing efficient database connection client processes, pools and transactions.

DBConnection handles callbacks differently to most behaviours. Some callbacks will be called in the calling process, with the state copied to and from the calling process. This is useful when the data for a request is large and means that a calling process can interact with a socket directly.

A side effect of this is that query handling can be written in a simple blocking fashion, while the connection process itself will remain responsive to OTP messages and can enqueue and cancel queued requests.

If a request or series of requests takes too long to handle in the client process a timeout will trigger and the socket can be cleanly disconnected by the connection process.

If a calling process waits too long to start its request it will timeout and its request will be cancelled. This prevents requests building up when the database can not keep up.

If no requests are received for a period of time the connection will trigger an idle timeout and the database can be pinged to keep the connection alive.

Should the connection be lost, attempts will be made to reconnect with (configurable) exponential random backoff to reconnect. All state is lost when a connection disconnects but the process is reused.

The DBConnection.Query protocol provide utility functions so that queries can be prepared or encoded and results decoding without blocking the connection or pool.

By default the DBConnection provides a single connection. However the :pool option can be set to use a pool of connections. If a pool is used the module must be passed as an option - unless inside a run/3 or transaction/3 fun and using the run/transaction connection reference (t).

Summary

Types

t()

Run or transaction connection reference

Functions

Create a supervisor child specification for a pool of connections

Close a prepared query on a database connection and return {:ok, result} on success or {:error, exception} on error

Close a prepared query on a database connection and return the result. Raises an exception on error

Ensures the given pool applications have been started

Execute a prepared query with a database connection and return {:ok, result} on success or {:error, exception} if there was an error

Execute a prepared query with a database connection and return the result. Raises an exception on error

Prepare a query with a database connection for later execution and returns {:ok, query} on success or {:error, exception} if there was an error

Prepare a query with a database connection and return the prepared query. An exception is raised on error

Prepare a query and execute it with a database connection and return both the prepared query and the result, {:ok, query, result} on success or {:error, exception} if there was an error

Prepare a query and execute it with a database connection and return both the prepared query and result. An exception is raised on error

Create a stream that will prepare a query, execute it and stream results using a cursor

Rollback a transaction, does not return

Acquire a lock on a connection and run a series of requests on it. The result of the fun is return inside an :ok tuple: {:ok, result}

Start and link to a database connection process

Create a stream that will execute a prepared query and stream results using a cursor

Acquire a lock on a connection and run a series of requests inside a transaction. The result of the transaction fun is return inside an :ok tuple: {:ok result}

Macros

Use DBConnection to set the behaviour and include default no-op implementations for ping/1 and handle_info/2

Callbacks

Checks in the state to the connection process. Return {:ok, state} to allow the checkin or {:disconnect, exception} to disconnect

Checkouts the state from the connection process. Return {:ok, state} to allow the checkout or `{:disconnect, exception} to disconnect

Connect to the database. Return {:ok, state} on success or {:error, exception} on failure

Disconnect from the database. Return :ok

Handle the beginning of a transaction. Return {:ok, result, state} to continue, {:error, exception, state} to abort the transaction and continue or {:disconnect, exception, state} to abort the transaction and disconnect

Close a query prepared by handle_prepare/3 with the database. Return {:ok, result, state} on success and to continue, {:error, exception, state} to return an error and continue, or {:disconnect, exception, state} to return an errior and disconnect

Handle commiting a transaction. Return {:ok, result, state} on success and to continue, {:error, exception, state} to abort the transaction and continue or {:disconnect, exception, state} to abort the transaction and disconnect

Deallocate a cursor declared by handle_declare/4' with the database. Return{:ok, result, state}on success and to continue,{:error, exception, state}to return an error and continue, or{:disconnect, exception, state}` to return an errior and disconnect

Declare a cursor using a query prepared by handle_prepare/3. Return {:ok, cursor, state}' to start a cursor for a stream and continue,{:error, exception, state}to return an error and continue or{:disconnect, exception, state}` to return an error and disconnect

Execute a query prepared by handle_prepare/3. Return {:ok, result, state} to return the result result and continue, {:error, exception, state} to return an error and continue or {:disconnect, exception, state} to return an error and disconnect

Fetch the first result from a cursor declared by handle_declare/4. Return {:ok, result, state} to return the result result and continue, {:deallocate, result, state} to return the result result and deallocate, {:error, exception, state} to return an error and close the cursor, {:disconnect, exception, state} to return an error and disconnect

Handle a message received by the connection process when checked in. Return {:ok, state} to continue or {:disconnect, exception, state} to disconnect

Fetch the next result from a cursor declared by handle_declare/4. Return {:ok, result, state} to return the result result and continue, {:deallocate, result, state} to return the result result and deallocate, {:error, exception, state} to return an error and close the cursor, {:disconnect, exception, state} to return an error and disconnect

Prepare a query with the database. Return {:ok, query, state} where query is a query to pass to execute/4 or close/3, {:error, exception, state} to return an error and continue or {:disconnect, exception, state} to return an error and disconnect

Handle rolling back a transaction. Return {:ok, result, state} on success and to continue, {:error, exception, state} to abort the transaction and continue or {:disconnect, exception, state} to abort the transaction and disconnect

Called when the connection has been idle for a period of time. Return {:ok, state} to continue or {:disconnect, exception} to disconnect

Types

cursor :: any
params :: any
query :: any
result :: any
t :: %DBConnection{conn_mod: any, conn_ref: reference, pool_mod: module, pool_ref: any}

Run or transaction connection reference.

Functions

child_spec(conn_mod, opts, child_opts \\ [])

Specs

child_spec(module, opts :: Keyword.t, child_opts :: Keyword.t) :: Supervisor.Spec.spec

Create a supervisor child specification for a pool of connections.

See Supervisor.Spec for child options (child_opts).

close(conn, query, opts \\ [])

Specs

close(conn, query, opts :: Keyword.t) ::
  {:ok, result} |
  {:error, Exception.t}

Close a prepared query on a database connection and return {:ok, result} on success or {:error, exception} on error.

This function should be used to free resources held by the connection process and/or the database server.

Options

  • :pool_timeout - The maximum time to wait for a reply when making a synchronous call to the pool (default: 5_000)
  • :queue - Whether to block waiting in an internal queue for the connection’s state (boolean, default: true)
  • :timeout - The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default: 15_000)
  • :log - A function to log information about a call, either a 1-arity fun, {module, function, args} with DBConnection.LogEntry.t prepended to args or nil. See DBConnection.LogEntry (default: nil)

The pool and connection module may support other options. All options are passed to handle_close/3.

See prepare/3.

close!(conn, query, opts \\ [])

Specs

close!(conn, query, opts :: Keyword.t) :: result

Close a prepared query on a database connection and return the result. Raises an exception on error.

See close/3.

ensure_all_started(opts, type \\ :temporary)

Specs

ensure_all_started(opts :: Keyword.t, type :: atom) ::
  {:ok, [atom]} |
  {:error, atom}

Ensures the given pool applications have been started.

Options

execute(conn, query, params, opts \\ [])

Specs

execute(conn, query, params, opts :: Keyword.t) ::
  {:ok, result} |
  {:error, Exception.t}

Execute a prepared query with a database connection and return {:ok, result} on success or {:error, exception} if there was an error.

If the query is not prepared on the connection an attempt may be made to prepare it and then execute again.

Options

  • :pool_timeout - The maximum time to wait for a reply when making a synchronous call to the pool (default: 5_000)
  • :queue - Whether to block waiting in an internal queue for the connection’s state (boolean, default: true)
  • :timeout - The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default: 15_000)
  • :log - A function to log information about a call, either a 1-arity fun, {module, function, args} with DBConnection.LogEntry.t prepended to args or nil. See DBConnection.LogEntry (default: nil)

The pool and connection module may support other options. All options are passed to handle_execute/4.

See prepare/3.

execute!(conn, query, params, opts \\ [])

Specs

execute!(conn, query, params, opts :: Keyword.t) :: result

Execute a prepared query with a database connection and return the result. Raises an exception on error.

See execute/4

fetch(conn, fun, call, query, cursor, opts)
prepare(conn, query, opts \\ [])

Specs

prepare(conn, query, opts :: Keyword.t) ::
  {:ok, query} |
  {:error, Exception.t}

Prepare a query with a database connection for later execution and returns {:ok, query} on success or {:error, exception} if there was an error.

The returned query can then be passed to execute/3 and/or close/3

Options

  • :pool_timeout - The maximum time to wait for a reply when making a synchronous call to the pool (default: 5_000)
  • :queue - Whether to block waiting in an internal queue for the connection’s state (boolean, default: true)
  • :timeout - The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default: 15_000)
  • :log - A function to log information about a call, either a 1-arity fun, {module, function, args} with DBConnection.LogEntry.t prepended to args or nil. See DBConnection.LogEntry (default: nil)

The pool and connection module may support other options. All options are passed to handle_prepare/3.

Example

query         = %Query{statement: "SELECT id FROM table"}
{:ok, query}  = DBConnection.prepare(conn, query)
{:ok, result} = DBConnection.execute(conn, query, [])
:ok           = DBConnection.close(conn, query)
prepare!(conn, query, opts \\ [])

Specs

prepare!(conn, query, opts :: Keyword.t) :: query

Prepare a query with a database connection and return the prepared query. An exception is raised on error.

See prepare/3.

prepare_execute(conn, query, params, opts \\ [])

Specs

prepare_execute(conn, query, params, Keyword.t) ::
  {:ok, query, result} |
  {:error, Exception.t}

Prepare a query and execute it with a database connection and return both the prepared query and the result, {:ok, query, result} on success or {:error, exception} if there was an error.

The returned query can be passed to execute/4 and close/3.

Options

  • :pool_timeout - The maximum time to wait for a reply when making a synchronous call to the pool (default: 5_000)
  • :queue - Whether to block waiting in an internal queue for the connection’s state (boolean, default: true)
  • :timeout - The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default: 15_000)
  • :log - A function to log information about a call, either a 1-arity fun, {module, function, args} with DBConnection.LogEntry.t prepended to args or nil. See DBConnection.LogEntry (default: nil)

Example

query = %Query{statement: “SELECT id FROM table WHERE id=$1”} {:ok, query, result} = DBConnection.prepare_execute(conn, query, [1]) {:ok, result2} = DBConnection.execute(conn, query, [2]) :ok = DBConnection.close(conn, query)

prepare_execute!(conn, query, params, opts \\ [])

Prepare a query and execute it with a database connection and return both the prepared query and result. An exception is raised on error.

See prepare_execute/4.

prepare_stream(conn, query, params, opts)

Specs

prepare_stream(t, query, params, opts :: Keyword.t) :: DBConnection.PrepareStream.t

Create a stream that will prepare a query, execute it and stream results using a cursor.

Options

  • :pool_timeout - The maximum time to wait for a reply when making a synchronous call to the pool (default: 5_000)
  • :queue - Whether to block waiting in an internal queue for the connection’s state (boolean, default: true)
  • :timeout - The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default: 15_000)
  • :log - A function to log information about a call, either a 1-arity fun, {module, function, args} with DBConnection.LogEntry.t prepended to args or nil. See DBConnection.LogEntry (default: nil)

The pool and connection module may support other options. All options are passed to handle_prepare/3,handle_close/3, handle_declare/4, handle_first/4, handle_next/4' andhandle_deallocate/4`. ### Example {:ok, results} = DBConnection.transaction(conn, fn(conn) -> query = %Query{statement: “SELECT id FROM table”} stream = DBConnection.prepare_stream(conn, query, []) Enum.to_list(stream) end)

rollback(conn, err)

Specs

rollback(t, reason :: any) :: no_return

Rollback a transaction, does not return.

Aborts the current transaction fun. If inside transaction/3 bubbles up to the top level.

Example

{:error, :bar} = DBConnection.transaction(conn, fn(conn) ->
  DBConnection.rollback(conn, :bar)
  IO.puts "never reaches here!"
end)
run(conn, fun, opts \\ [])

Specs

run(conn, (t -> result), opts :: Keyword.t) :: result when result: var

Acquire a lock on a connection and run a series of requests on it. The result of the fun is return inside an :ok tuple: {:ok, result}.

To use the locked connection call the request with the connection reference passed as the single argument to the fun. If the connection disconnects all future calls using that connection reference will fail.

run/3 and transaction/3 can be nested multiple times but a transaction/3 call inside another transaction/3 will be treated the same as run/3.

Options

  • :pool_timeout - The maximum time to wait for a reply when making a synchronous call to the pool (default: 5_000)
  • :queue - Whether to block waiting in an internal queue for the connection’s state (boolean, default: true)
  • :timeout - The maximum time that the caller is allowed the to hold the connection’s state (default: 15_000)

The pool may support other options.

Example

{:ok, res} = DBConnection.run(conn, fn(conn) ->
  DBConnection.execute!(conn, "SELECT id FROM table", [])
end)
start_link(conn_mod, opts)

Specs

start_link(module, opts :: Keyword.t) :: GenServer.on_start

Start and link to a database connection process.

Options

  • :pool - The DBConnection.Pool module to use, (default: DBConnection.Connection)
  • :idle - The idle strategy, :passive to avoid checkin when idle and :active to checkin when idle (default: :passive)
  • :idle_timeout - The idle timeout to ping the database (default: 1_000)
  • :backoff_min - The minimum backoff interval (default: 1_000)
  • :backoff_max - The maximum backoff interval (default: 30_000)
  • :backoff_type - The backoff strategy, :stop for no backoff and to stop, :exp for exponential, :rand for random and :rand_exp for random exponential (default: :rand_exp)
  • :after_connect - A function to run on connect using run/3, either a 1-arity fun, {module, function, args} with DBConnection.t prepended to args or nil (default: nil)
  • :name - A name to register the started process (see the :name option in GenServer.start_link/3).

Example

{:ok, conn} = DBConnection.start_link(mod, [idle_timeout: 5_000])
stream(conn, query, params, opts \\ [])

Specs

stream(t, query, params, opts :: Keyword.t) :: DBConnection.Stream.t

Create a stream that will execute a prepared query and stream results using a cursor.

Options

  • :pool_timeout - The maximum time to wait for a reply when making a synchronous call to the pool (default: 5_000)
  • :queue - Whether to block waiting in an internal queue for the connection’s state (boolean, default: true)
  • :timeout - The maximum time that the caller is allowed the to hold the connection’s state (ignored when using a run/transaction connection, default: 15_000)
  • :log - A function to log information about a call, either a 1-arity fun, {module, function, args} with DBConnection.LogEntry.t prepended to args or nil. See DBConnection.LogEntry (default: nil)

The pool and connection module may support other options. All options are passed to handle_declare/4, handle_first/4 , handle_next/4 andhandle_deallocate/4`. ### Example {:ok, results} = DBConnection.transaction(conn, fn(conn) -> query = %Query{statement: “SELECT id FROM table”} query = DBConnection.prepare!(conn, query) stream = DBConnection.stream(conn, query, []) Enum.to_list(stream) end)

transaction(conn, fun, opts \\ [])

Specs

transaction(conn, (conn -> result), opts :: Keyword.t) ::
  {:ok, result} |
  {:error, reason :: any} when result: var

Acquire a lock on a connection and run a series of requests inside a transaction. The result of the transaction fun is return inside an :ok tuple: {:ok result}.

To use the locked connection call the request with the connection reference passed as the single argument to the fun. If the connection disconnects all future calls using that connection reference will fail.

run/3 and transaction/3 can be nested multiple times. If a transaction is rolled back or a nested transaction fun raises the transaction is marked as failed. Any calls inside a failed transaction (except rollback/2) will raise until the outer transaction call returns. All running transaction/3 calls will return {:error, :rollback} if the transaction failed or connection closed and rollback/2 is not called for that transaction/3.

Options

  • :pool_timeout - The maximum time to wait for a reply when making a synchronous call to the pool (default: 5_000)
  • :queue - Whether to block waiting in an internal queue for the connection’s state (boolean, default: true)
  • :timeout - The maximum time that the caller is allowed the to hold the connection’s state (default: 15_000)
  • :log - A function to log information about begin, commit and rollback calls made as part of the transaction, either a 1-arity fun, {module, function, args} with DBConnection.LogEntry.t prepended to args or nil. See DBConnection.LogEntry (default: nil)

The pool and connection module may support other options. All options are passed to handle_begin/2, handle_commit/2 and handle_rollback/2.

Example

{:ok, res} = DBConnection.transaction(conn, fn(conn) ->
  DBConnection.execute!(conn, "SELECT id FROM table", [])
end)

Macros

__using__()

Use DBConnection to set the behaviour and include default no-op implementations for ping/1 and handle_info/2.

Callbacks

checkin(state)

Specs

checkin(state :: any) ::
  {:ok, new_state :: any} |
  {:disconnect, Exception.t, new_state :: any}

Checks in the state to the connection process. Return {:ok, state} to allow the checkin or {:disconnect, exception} to disconnect.

This callback is called when the control of the state is passed back to the connection process. It should reverse any changes made in checkout/2.

This callback is called in the connection process.

checkout(state)

Specs

checkout(state :: any) ::
  {:ok, new_state :: any} |
  {:disconnect, Exception.t, new_state :: any}

Checkouts the state from the connection process. Return {:ok, state} to allow the checkout or {:disconnect, exception} to disconnect. This callback is called when the control of the state is passed to another process.checkin/1is called with the new state when control is returned to the connection process. Messages are discarded, instead of being passed tohandle_info/2`, when the state is checked out. This callback is called in the connection process.

connect(opts)

Specs

connect(opts :: Keyword.t) ::
  {:ok, state :: any} |
  {:error, Exception.t}

Connect to the database. Return {:ok, state} on success or {:error, exception} on failure.

If an error is returned it will be logged and another connection attempt will be made after a backoff interval.

This callback is called in the connection process.

disconnect(err, state)

Specs

disconnect(err :: Exception.t, state :: any) :: :ok

Disconnect from the database. Return :ok.

The exception as first argument is the exception from a :disconnect 3-tuple returned by a previous callback.

If the state is controlled by a client and it exits or takes too long to process a request the state will be last known state. In these cases the exception will be a DBConnection.ConnectionError.

This callback is called in the connection process.

handle_begin(opts, state)

Specs

handle_begin(opts :: Keyword.t, state :: any) ::
  {:ok, result, new_state :: any} |
  {:error | :disconnect, Exception.t, new_state :: any}

Handle the beginning of a transaction. Return {:ok, result, state} to continue, {:error, exception, state} to abort the transaction and continue or {:disconnect, exception, state} to abort the transaction and disconnect.

This callback is called in the client process.

handle_close(query, opts, state)

Specs

handle_close(query, opts :: Keyword.t, state :: any) ::
  {:ok, result, new_state :: any} |
  {:error | :disconnect, Exception.t, new_state :: any}

Close a query prepared by handle_prepare/3 with the database. Return {:ok, result, state} on success and to continue, {:error, exception, state} to return an error and continue, or {:disconnect, exception, state} to return an errior and disconnect.

This callback is called in the client process.

handle_commit(opts, state)

Specs

handle_commit(opts :: Keyword.t, state :: any) ::
  {:ok, result, new_state :: any} |
  {:error | :disconnect, Exception.t, new_state :: any}

Handle commiting a transaction. Return {:ok, result, state} on success and to continue, {:error, exception, state} to abort the transaction and continue or {:disconnect, exception, state} to abort the transaction and disconnect.

This callback is called in the client process.

handle_deallocate(query, cursor, opts, state)

Specs

handle_deallocate(query, cursor, opts :: Keyword.t, state :: any) ::
  {:ok, result, new_state :: any} |
  {:error | :disconnect, Exception.t, new_state :: any}

Deallocate a cursor declared by handle_declare/4' with the database. Return{:ok, result, state}on success and to continue,{:error, exception, state}to return an error and continue, or{:disconnect, exception, state}` to return an errior and disconnect. This callback is called in the client process.

handle_declare(query, params, opts, state)

Specs

handle_declare(query, params, opts :: Keyword.t, state :: any) ::
  {:ok, cursor, new_state :: any} |
  {:error | :disconnect, Exception.t, new_state :: any}

Declare a cursor using a query prepared by handle_prepare/3. Return {:ok, cursor, state}' to start a cursor for a stream and continue,{:error, exception, state}to return an error and continue or{:disconnect, exception, state}` to return an error and disconnect. This callback is called in the client process.

handle_execute(query, params, opts, state)

Specs

handle_execute(query, params, opts :: Keyword.t, state :: any) ::
  {:ok, result, new_state :: any} |
  {:error | :disconnect, Exception.t, new_state :: any}

Execute a query prepared by handle_prepare/3. Return {:ok, result, state} to return the result result and continue, {:error, exception, state} to return an error and continue or {:disconnect, exception, state} to return an error and disconnect.

This callback is called in the client process.

handle_first(query, cursor, opts, state)

Specs

handle_first(query, cursor, opts :: Keyword.t, state :: any) ::
  {:ok | :deallocate, result, new_state :: any} |
  {:error | :disconnect, Exception.t, new_state :: any}

Fetch the first result from a cursor declared by handle_declare/4. Return {:ok, result, state} to return the result result and continue, {:deallocate, result, state} to return the result result and deallocate, {:error, exception, state} to return an error and close the cursor, {:disconnect, exception, state} to return an error and disconnect.

This callback is called in the client process.

handle_info(msg, state)

Specs

handle_info(msg :: any, state :: any) ::
  {:ok, new_state :: any} |
  {:disconnect, Exception.t, new_state :: any}

Handle a message received by the connection process when checked in. Return {:ok, state} to continue or {:disconnect, exception, state} to disconnect.

Messages received by the connection process when checked out will be logged and discared.

This callback is called in the connection process.

handle_next(query, cursor, opts, state)

Specs

handle_next(query, cursor, opts :: Keyword.t, state :: any) ::
  {:ok | :deallocate, result, new_state :: any} |
  {:error | :disconnect, Exception.t, new_state :: any}

Fetch the next result from a cursor declared by handle_declare/4. Return {:ok, result, state} to return the result result and continue, {:deallocate, result, state} to return the result result and deallocate, {:error, exception, state} to return an error and close the cursor, {:disconnect, exception, state} to return an error and disconnect.

This callback is called in the client process.

handle_prepare(query, opts, state)

Specs

handle_prepare(query, opts :: Keyword.t, state :: any) ::
  {:ok, query, new_state :: any} |
  {:error | :disconnect, Exception.t, new_state :: any}

Prepare a query with the database. Return {:ok, query, state} where query is a query to pass to execute/4 or close/3, {:error, exception, state} to return an error and continue or {:disconnect, exception, state} to return an error and disconnect.

This callback is intended for cases where the state of a connection is needed to prepare a query and/or the query can be saved in the database to call later.

This callback is called in the client process.

handle_rollback(opts, state)

Specs

handle_rollback(opts :: Keyword.t, state :: any) ::
  {:ok, result, new_state :: any} |
  {:error | :disconnect, Exception.t, new_state :: any}

Handle rolling back a transaction. Return {:ok, result, state} on success and to continue, {:error, exception, state} to abort the transaction and continue or {:disconnect, exception, state} to abort the transaction and disconnect.

A transaction will be rolled back if an exception occurs or rollback/2 is called.

This callback is called in the client process.

ping(state)

Specs

ping(state :: any) ::
  {:ok, new_state :: any} |
  {:disconnect, Exception.t, new_state :: any}

Called when the connection has been idle for a period of time. Return {:ok, state} to continue or {:disconnect, exception} to disconnect.

This callback is called if no callbacks have been called after the idle timeout and a client process is not using the state. The idle timeout can be configured by the :idle_timeout option. This function can be called whether the connection is checked in or checked out.

This callback is called in the connection process.