View Source Mongo.Session (mongodb-driver v1.5.0)
This module implements the details of the transactions api (see specs).
In case of MongoDB 3.6 or greater the driver uses sessions for each operation. If no session is created the driver will create a so-called implicit session. A session is a UUID-Number which is added to some operations. The sessions are used to manage the transaction state as well. In most situation you need not to create a session instance, so the api of the driver is not changed.
In case of multiple insert statements you can use transaction (MongoDB 4.x) to be sure that all operations are grouped like a single operation. Prerequisites for transactions are: MongoDB 4.x must be used as replica set or cluster deployment. The collection used in the operations must already exist. Some operation are not allowed (For example: create index or call count).
Example
alias Mongo.Session
{:ok, session} = Session.start_session(top, :write, [])
:ok = Session.start_transaction(session)
Mongo.insert_one(top, "dogs", %{name: "Greta"}, session: session)
Mongo.insert_one(top, "dogs", %{name: "Waldo"}, session: session)
Mongo.insert_one(top, "dogs", %{name: "Tom"}, session: session)
:ok = Session.commit_transaction(session)
:ok = Session.end_session(top, session)
First you start a explicit session and a transactions. Use the session for each insert statement as an options with key :session
otherwise the insert statement won't be
executed in the transaction. After that you commit the transaction and end the session by calling end_session
.
Convenient API for Transactions
This method is responsible for starting a transaction, invoking a callback, and committing a transaction. The callback is expected to execute one or more operations with the transaction; however, that is not enforced. The callback is allowed to execute other operations not associated with the transaction.
Example
{:ok, ids} = Mongo.transaction(top, fn ->
{:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"})
{:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"})
{:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"})
{:ok, [id1, id2, id3]}
end, w: 3)
If the callback is successful then it returns a tuple with the keyword :ok
and a used defined result like {:ok, [id1, id2, id3]}
. In this example we use
the write concern w: 3
. The write concern used in the insert operation will be removed by the driver. It is applied in the commit transaction command.
Implicit vs explicit sessions
In most cases the driver will create implicit sessions for you. The session is put in the process' dictionary under the key :session
and the opts
is extended by :session
as well. This behaviour is specified by the mongodb specification for
drivers.
If you use the :causal_consistency
flag, then you need to create an explicit session:
alias Mongo.Session
{:ok, session} = Session.start_session(top, :write, causal_consistency: true)
Mongo.delete_many(top, "dogs", %{"Greta"}, session: session, w: :majority)
{:ok, 0} = Mongo.count(top, "dogs", %{name: "Greta"}, session: session, read_concern: %{level: :majority})
:ok = Session.end_session(top, session)
For more information about causal consistency see the officially documentation.
Note that Mongo.Stream implements the Enumerable protocol and the reduce/3 function calls Mongo.Stream.checkin_session/3 after the stream is exhausted.
If you want to use transaction, then you need to create a session as well:
alias Mongo.Session
{:ok, session} = Session.start_session(top, :write, [])
:ok = Session.start_transaction(session)
Mongo.insert_one(top, "dogs", %{name: "Greta"}, session: session)
Mongo.insert_one(top, "dogs", %{name: "Waldo"}, session: session)
Mongo.insert_one(top, "dogs", %{name: "Tom"}, session: session)
:ok = Session.commit_transaction(session)
:ok = Session.end_session(top, session)
You can shorten this code by using the with_transaction
function:
{:ok, ids} = Mongo.transaction(top, fn ->
{:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"})
{:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"})
{:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"})
{:ok, [id1, id2, id3]}
end, w: 1)
Summary
Functions
Abort the current transaction and rollback all changes.
Advance the operationTime
for causally consistent read commands
Check if the session is alive.
Merge the session / transaction data into the cmd. There is no need to call this function directly. It is called automatically. The global session timeout is merged to the options as well.
Commit the current transaction.
Return the connection used in the session.
End explicit session.
This function allows nested in_session
calls and provides a session if no session exists so far. A provided
session lives in the Process dictionary :session
or is specified in the opts
dictionary.
Start the generic state machine.
Start a new session for the topology_pid
. You need to specify the type
: :read
for read and :write
for write
operations.
Start a new transaction.
Update the recoveryToken
after each response from mongos
Update the operationTime
for causally consistent read commands. There is no need to call this function directly. It is called automatically.
Return the wire_version used in the session.
Convenient function for running multiple write commands in a transaction.
Types
@type t() :: pid()
Functions
Abort the current transaction and rollback all changes.
@spec advance_operation_time(t(), BSON.Timestamp.t()) :: any()
Advance the operationTime
for causally consistent read commands
Check if the session is alive.
@spec bind_session(t(), BSON.document(), Keyword.t()) :: {:ok, pid(), BSON.document(), Keyword.t()} | {:error, term()}
Merge the session / transaction data into the cmd. There is no need to call this function directly. It is called automatically. The global session timeout is merged to the options as well.
Commit the current transaction.
@spec commit_transaction(t(), DateTime.t()) :: :ok | {:error, term()}
Return the connection used in the session.
@spec end_session(GenServer.server(), t()) :: :ok | :error
End explicit session.
This function allows nested in_session
calls and provides a session if no session exists so far. A provided
session lives in the Process dictionary :session
or is specified in the opts
dictionary.
start_link(topology, conn, address, server_session, wire_version, opts)
View SourceStart the generic state machine.
@spec start_session(GenServer.server(), atom(), keyword()) :: {:ok, t()} | {:error, term()}
Start a new session for the topology_pid
. You need to specify the type
: :read
for read and :write
for write
operations.
Example
{:ok, session} = Session.start_session(top, :write, [])
Start a new transaction.
@spec update_recovery_token(t(), BSON.document()) :: any()
Update the recoveryToken
after each response from mongos
Update the operationTime
for causally consistent read commands. There is no need to call this function directly. It is called automatically.
Return the wire_version used in the session.
Convenient function for running multiple write commands in a transaction.
In case of TransientTransactionError
or UnknownTransactionCommitResult
the function will retry the whole transaction or
the commit of the transaction. You can specify a timeout (:transaction_retry_timeout_s
) to limit the time of repeating.
The default value is 120 seconds. If you don't wait so long, you call with_transaction
with the
option transaction_retry_timeout_s: 10
. In this case after 10 seconds of retrying, the function will return
an error.
Example
alias Mongo.Session
{:ok, ids} = Session.with_transaction(top, fn opts ->
{:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"}, opts)
{:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"}, opts)
{:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"}, opts)
{:ok, [id1, id2, id3]}
end, transaction_retry_timeout_s: 10)
From the specs:
The callback function may be executed multiple times
The implementation of with_transaction
is based on the original examples for Retry Transactions and
Commit Operation from the MongoDB Manual. As such, the callback may be executed any number of times.
Drivers are free to encourage their users to design idempotent callbacks.