Mongo.Session (mongodb-driver v0.7.4) View Source

This module implements the details of the transactions api (see specs).

In case of MongoDB 3.6 or greater the driver uses sessions for each operation. If no session is created the driver will create a so-called implicit session. A session is a UUID-Number which is added to some operations. The sessions are used to manage the transaction state as well. In most situation you need not to create a session instance, so the api of the driver is not changed.

In case of multiple insert statemantes you can use transaction (MongoDB 4.x) to be sure that all operations are grouped like a single operation. Prerequisites for transactions are: MongoDB 4.x must be used as replica set or cluster deployment. The collection used in the operations must already exist. Some operation are not allowed (For example: create index or call count).

Example

alias Mongo.Session

{:ok, session} = Session.start_session(top, :write, [])
:ok = Session.start_transaction(session)

Mongo.insert_one(top, "dogs", %{name: "Greta"}, session: session)
Mongo.insert_one(top, "dogs", %{name: "Waldo"}, session: session)
Mongo.insert_one(top, "dogs", %{name: "Tom"}, session: session)

:ok = Session.commit_transaction(session)
:ok = Session.end_session(top, session)

First you start a explicit session and a transactions. Use the session for each insert statement as an options with key :session otherwise the insert statement won't be executed in the transaction. After that you commit the transaction and end the session by calling end_session.

Convenient API for Transactions

This method is responsible for starting a transaction, invoking a callback, and committing a transaction. The callback is expected to execute one or more operations with the transaction; however, that is not enforced. The callback is allowed to execute other operations not associated with the transaction.

Example

{:ok, ids} = Session.with_transaction(top, fn opts ->
  {:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"}, opts)
  {:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"}, opts)
  {:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"}, opts)
  {:ok, [id1, id2, id3]}
end, w: 1)

If the callback is successfull then it returns a tupel with the keyword :ok and a used defined result like {:ok, [id1, id2, id3]}. In this example we use the write concern w: 1. The write concern used in the insert operation will be removed by the driver. It is applied in the commit transaction command.

Implicit vs explicit sessions

In most cases the driver will create implicit sessions for you. Each time when you run a query or a command the driver executes the following functions:

with {:ok, session} <- Session.start_implicit_session(topology_pid, type, opts),
   result <- exec_command_session(session, new_cmd, opts),
   :ok <- Session.end_implict_session(topology_pid, session) do
...

This behaviour is specified by the mongodb specification for drivers.

If you use the :causal_consistency flag, then you need to create an explicit session:

alias Mongo.Session

{:ok, session} = Session.start_session(top, :write, causal_consistency: true)

Mongo.delete_many(top, "dogs", %{"Greta"}, session: session)
{:ok, 0} = Mongo.count(top, "dogs", %{name: "Greta"}, session: session)

:ok = Session.end_session(top, session)

For more information about causal consistency see the officially documentation.

If you want to use transaction, then you need to create a session as well:

alias Mongo.Session

{:ok, session} = Session.start_session(top, :write, [])
:ok = Session.start_transaction(session)

Mongo.insert_one(top, "dogs", %{name: "Greta"}, session: session)
Mongo.insert_one(top, "dogs", %{name: "Waldo"}, session: session)
Mongo.insert_one(top, "dogs", %{name: "Tom"}, session: session)

:ok = Session.commit_transaction(session)
:ok = Session.end_session(top, session)

You can shorten this code by using the with_transaction function:

alias Mongo.Session

{:ok, ids} = Session.with_transaction(top, fn opts ->
  {:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"}, opts)
  {:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"}, opts)
  {:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"}, opts)
  {:ok, [id1, id2, id3]}
end, w: 1)

Link to this section Summary

Functions

Abort the current transation and rollback all changes.

Advance the operationTime for causally consistent read commands

Check if the session is alive.

Merge the session / transaction data into the cmd. There is no need to call this function directly. It is called automatically.

Commit the current transation.

Return the connection used in the session.

End implicit session. There is no need to call this function directly. It is called automatically.

End explicit session.

Return the server session used in the session.

Start a new implicit session only if no explicit session exists. It returns the session in the opts keyword list or creates a new one.

Start a new session for the topology_pid. You need to specify the type: :read for read and :write for write operations.

Start a new transation.

Update the recoveryToken after each response from mongos

Update the operationTime for causally consistent read commands. There is no need to call this function directly. It is called automatically.

Convenient function for running multiple write commands in a transaction.

Link to this section Types

Link to this section Functions

Specs

abort_transaction(t()) :: :ok | {:error, term()}

Abort the current transation and rollback all changes.

Link to this function

advance_operation_time(pid, timestamp)

View Source

Specs

advance_operation_time(t(), BSON.Timestamp.t()) :: none()

Advance the operationTime for causally consistent read commands

Check if the session is alive.

Specs

bind_session(t(), BSON.document()) :: :ok | {:error, term()}

Merge the session / transaction data into the cmd. There is no need to call this function directly. It is called automatically.

Commit the current transation.

Link to this function

commit_transaction(pid, start_time)

View Source

Specs

commit_transaction(t(), DateTime.t()) :: :ok | {:error, term()}

Specs

connection(t()) :: pid()

Return the connection used in the session.

Link to this function

end_implict_session(topology_pid, session)

View Source

Specs

end_implict_session(GenServer.server(), t()) :: :ok | :error

End implicit session. There is no need to call this function directly. It is called automatically.

Link to this function

end_session(topology_pid, session)

View Source

Specs

end_session(GenServer.server(), t()) :: :ok | :error

End explicit session.

Link to this function

handle_call_event(arg1, transaction, data)

View Source
Link to this function

handle_cast_event(arg, state, data)

View Source
Link to this function

init(topology, conn, server_session, type, wire_version, opts)

View Source
Link to this function

run_in_transaction(topology_pid, session, fun, start_time, opts)

View Source
Link to this function

select_server(pid, opts)

View Source

Specs

server_session(t()) :: Mongo.Session.ServerSession.t()
server_session(t()) :: boolean()

Return the server session used in the session.

Link to this function

start_implicit_session(topology_pid, type, opts)

View Source

Specs

start_implicit_session(GenServer.server(), atom(), keyword()) ::
  {:ok, t()} | {:error, term()}

Start a new implicit session only if no explicit session exists. It returns the session in the opts keyword list or creates a new one.

Link to this function

start_link(topology, conn, server_session, type, wire_version, opts)

View Source

Start the generic state machine.

Link to this function

start_session(topology_pid, type, opts \\ [])

View Source

Specs

start_session(GenServer.server(), atom(), keyword()) ::
  {:ok, t()} | {:error, term()}

Start a new session for the topology_pid. You need to specify the type: :read for read and :write for write operations.

Example

{:ok, session} = Session.start_session(top, :write, [])

Specs

start_transaction(t()) :: :ok | {:error, term()}

Start a new transation.

Link to this function

update_recovery_token(pid, recovery_token)

View Source

Specs

update_recovery_token(t(), BSON.document()) :: none()

Update the recoveryToken after each response from mongos

Link to this function

update_session(pid, doc, opts \\ [])

View Source

Specs

update_session(t(), %{key: BSON.Timestamp.t()}, keyword()) :: BSON.document()

Update the operationTime for causally consistent read commands. There is no need to call this function directly. It is called automatically.

Link to this function

with_transaction(topology_pid, fun, opts \\ [])

View Source

Convenient function for running multiple write commands in a transaction.

In case of TransientTransactionError or UnknownTransactionCommitResult the function will retry the whole transaction or the commit of the transaction. You can specify a timeout (:transaction_retry_timeout_s) to limit the time of repeating. The default value is 120 seconds. If you don't wait so long, you call with_transaction with the option transaction_retry_timeout_s: 10. In this case after 10 seconds of retrying, the function will return an error.

Example

alias Mongo.Session

{:ok, ids} = Session.with_transaction(top, fn opts ->
{:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"}, opts)
{:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"}, opts)
{:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"}, opts)
{:ok, [id1, id2, id3]}
end, transaction_retry_timeout_s: 10)

From the specs:

The callback function may be executed multiple times

The implementation of with_transaction is based on the original examples for Retry Transactions and Commit Operation from the MongoDB Manual. As such, the callback may be executed any number of times. Drivers are free to encourage their users to design idempotent callbacks.