View Source Mongo.Session (mongodb-driver v1.4.1)

This module implements the details of the transactions api (see specs).

In case of MongoDB 3.6 or greater the driver uses sessions for each operation. If no session is created the driver will create a so-called implicit session. A session is a UUID-Number which is added to some operations. The sessions are used to manage the transaction state as well. In most situation you need not to create a session instance, so the api of the driver is not changed.

In case of multiple insert statements you can use transaction (MongoDB 4.x) to be sure that all operations are grouped like a single operation. Prerequisites for transactions are: MongoDB 4.x must be used as replica set or cluster deployment. The collection used in the operations must already exist. Some operation are not allowed (For example: create index or call count).

Example

alias Mongo.Session

{:ok, session} = Session.start_session(top, :write, [])
:ok = Session.start_transaction(session)

Mongo.insert_one(top, "dogs", %{name: "Greta"}, session: session)
Mongo.insert_one(top, "dogs", %{name: "Waldo"}, session: session)
Mongo.insert_one(top, "dogs", %{name: "Tom"}, session: session)

:ok = Session.commit_transaction(session)
:ok = Session.end_session(top, session)

First you start a explicit session and a transactions. Use the session for each insert statement as an options with key :session otherwise the insert statement won't be executed in the transaction. After that you commit the transaction and end the session by calling end_session.

Convenient API for Transactions

This method is responsible for starting a transaction, invoking a callback, and committing a transaction. The callback is expected to execute one or more operations with the transaction; however, that is not enforced. The callback is allowed to execute other operations not associated with the transaction.

Example

{:ok, ids} = Mongo.transaction(top, fn ->
  {:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"})
  {:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"})
  {:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"})
  {:ok, [id1, id2, id3]}
end, w: 3)

If the callback is successful then it returns a tuple with the keyword :ok and a used defined result like {:ok, [id1, id2, id3]}. In this example we use the write concern w: 3. The write concern used in the insert operation will be removed by the driver. It is applied in the commit transaction command.

Implicit vs explicit sessions

In most cases the driver will create implicit sessions for you. The session is put in the process' dictionary under the key :session and the opts is extended by :session as well. This behaviour is specified by the mongodb specification for drivers.

If you use the :causal_consistency flag, then you need to create an explicit session:

alias Mongo.Session

{:ok, session} = Session.start_session(top, :write, causal_consistency: true)

Mongo.delete_many(top, "dogs", %{"Greta"}, session: session, w: :majority)
{:ok, 0} = Mongo.count(top, "dogs", %{name: "Greta"}, session: session, read_concern: %{level: :majority})

:ok = Session.end_session(top, session)

For more information about causal consistency see the officially documentation.

If you want to use transaction, then you need to create a session as well:

alias Mongo.Session

{:ok, session} = Session.start_session(top, :write, [])
:ok = Session.start_transaction(session)

Mongo.insert_one(top, "dogs", %{name: "Greta"}, session: session)
Mongo.insert_one(top, "dogs", %{name: "Waldo"}, session: session)
Mongo.insert_one(top, "dogs", %{name: "Tom"}, session: session)

:ok = Session.commit_transaction(session)
:ok = Session.end_session(top, session)

You can shorten this code by using the with_transaction function:

{:ok, ids} = Mongo.transaction(top, fn ->
  {:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"})
  {:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"})
  {:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"})
  {:ok, [id1, id2, id3]}
end, w: 1)

Summary

Functions

Abort the current transaction and rollback all changes.

Advance the operationTime for causally consistent read commands

Check if the session is alive.

Merge the session / transaction data into the cmd. There is no need to call this function directly. It is called automatically. The global session timeout is merged to the options as well.

Commit the current transaction.

Return the connection used in the session.

End explicit session.

This function allows nested in_session calls and provides a session if no session exists so far. A provided session lives in the Process dictionary :session or is specified in the opts dictionary.

Start a new session for the topology_pid. You need to specify the type: :read for read and :write for write operations.

Start a new transaction.

Update the recoveryToken after each response from mongos

Update the operationTime for causally consistent read commands. There is no need to call this function directly. It is called automatically.

Return the wire_version used in the session.

Convenient function for running multiple write commands in a transaction.

Types

Functions

@spec abort_transaction(t()) :: :ok | {:error, term()}

Abort the current transaction and rollback all changes.

Link to this function

advance_operation_time(pid, timestamp)

View Source
@spec advance_operation_time(t(), BSON.Timestamp.t()) :: any()

Advance the operationTime for causally consistent read commands

@spec alive?(t()) :: boolean()

Check if the session is alive.

Link to this function

bind_session(pid, cmd, opts)

View Source
@spec bind_session(t(), BSON.document(), Keyword.t()) ::
  {:ok, pid(), BSON.document(), Keyword.t()} | {:error, term()}

Merge the session / transaction data into the cmd. There is no need to call this function directly. It is called automatically. The global session timeout is merged to the options as well.

Commit the current transaction.

Link to this function

commit_transaction(pid, start_time)

View Source
@spec commit_transaction(t(), DateTime.t()) :: :ok | {:error, term()}
@spec connection(t()) :: pid()

Return the connection used in the session.

Link to this function

end_session(topology_pid, session)

View Source
@spec end_session(GenServer.server(), t()) :: :ok | :error

End explicit session.

Link to this function

handle_call_event(arg1, transaction, data)

View Source
Link to this function

handle_cast_event(arg, state, data)

View Source
Link to this function

in_read_session(topology_pid, fun, opts)

View Source
Link to this function

in_session(session, topology_pid, read_write_type, fun, opts)

View Source
Link to this function

in_write_session(topology_pid, fun, opts)

View Source

This function allows nested in_session calls and provides a session if no session exists so far. A provided session lives in the Process dictionary :session or is specified in the opts dictionary.

Link to this function

init(topology, conn, address, server_session, wire_version, opts)

View Source
Link to this function

mark_server_unknown(pid)

View Source
Link to this function

run_in_transaction(topology_pid, session, fun, start_time, opts)

View Source
Link to this function

select_server(pid, opts)

View Source
Link to this function

start_link(topology, conn, address, server_session, wire_version, opts)

View Source

Start the generic state machine.

Link to this function

start_session(topology_pid, read_write_type, opts \\ [])

View Source
@spec start_session(GenServer.server(), atom(), keyword()) ::
  {:ok, t()} | {:error, term()}

Start a new session for the topology_pid. You need to specify the type: :read for read and :write for write operations.

Example

{:ok, session} = Session.start_session(top, :write, [])
@spec start_transaction(t()) :: :ok | {:error, term()}

Start a new transaction.

Link to this function

update_recovery_token(pid, recovery_token)

View Source
@spec update_recovery_token(t(), BSON.document()) :: any()

Update the recoveryToken after each response from mongos

Link to this function

update_session(pid, arg, opts \\ [])

View Source

Update the operationTime for causally consistent read commands. There is no need to call this function directly. It is called automatically.

@spec wire_version(t()) :: integer()

Return the wire_version used in the session.

Link to this function

with_transaction(topology_pid, fun, opts \\ [])

View Source
This function is deprecated. Use Mongo.transaction/3 instead.

Convenient function for running multiple write commands in a transaction.

In case of TransientTransactionError or UnknownTransactionCommitResult the function will retry the whole transaction or the commit of the transaction. You can specify a timeout (:transaction_retry_timeout_s) to limit the time of repeating. The default value is 120 seconds. If you don't wait so long, you call with_transaction with the option transaction_retry_timeout_s: 10. In this case after 10 seconds of retrying, the function will return an error.

Example

alias Mongo.Session

{:ok, ids} = Session.with_transaction(top, fn opts ->
{:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"}, opts)
{:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"}, opts)
{:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"}, opts)
{:ok, [id1, id2, id3]}
end, transaction_retry_timeout_s: 10)

From the specs:

The callback function may be executed multiple times

The implementation of with_transaction is based on the original examples for Retry Transactions and Commit Operation from the MongoDB Manual. As such, the callback may be executed any number of times. Drivers are free to encourage their users to design idempotent callbacks.