Mongo.BulkWrite (mongodb-driver v1.4.0) View Source

The driver supports the so-called bulk writes (Specification):

The motivation for bulk writes lies in the possibility of optimizing to group the same operations. The driver supports

  • unordered and ordered bulk writes
  • in-memory and stream bulk writes

Unordered bulk writes

Unordered bulk writes have the highest optimization factor. Here all operations can be divided into three groups (inserts, updates and deletes). The order of execution within a group does not matter. However, the groups are executed in the order: inserts, updates and deletes. The following example creates three records, changes them, and then deletes all records. After execution, the collection is unchanged. It's valid, because of the execution order:

  1. inserts
  2. updates
  3. deletes

Example:

alias Mongo.BulkWrite
alias Mongo.UnorderedBulk

bulk = "bulk"
    |> UnorderedBulk.new()
    |> UnorderedBulk.insert_one(%{name: "Greta"})
    |> UnorderedBulk.insert_one(%{name: "Tom"})
    |> UnorderedBulk.insert_one(%{name: "Waldo"})
    |> UnorderedBulk.update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
    |> UnorderedBulk.update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
    |> UnorderedBulk.update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
    |> UnorderedBulk.delete_one(%{kind: "dog"})
    |> UnorderedBulk.delete_one(%{kind: "dog"})
    |> UnorderedBulk.delete_one(%{kind: "dog"})

result = BulkWrite.write(:mongo, bulk, w: 1)

Ordered bulk writes

Sometimes the order of execution is important for successive operations to yield a correct result. In this case, one uses ordered bulk writes. The following example would not work with unordered bulk writes because the order within the update operations is undefined. The update_many() will only work, if it is executed after the update_one() functions.

bulk = "bulk"
     |> OrderedBulk.new()
     |> OrderedBulk.insert_one(%{name: "Greta"})
     |> OrderedBulk.insert_one(%{name: "Tom"})
     |> OrderedBulk.insert_one(%{name: "Waldo"})
     |> OrderedBulk.update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
     |> OrderedBulk.update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
     |> OrderedBulk.update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
     |> OrderedBulk.update_many(%{kind: "dog"}, %{"$set": %{kind: "cat"}})
     |> OrderedBulk.delete_one(%{kind: "cat"})
     |> OrderedBulk.delete_one(%{kind: "cat"})
     |> OrderedBulk.delete_one(%{kind: "cat"})

result = BulkWrite.write(:mongo, bulk, w: 1)

Stream bulk writes

The examples shown initially filled the bulk with a few operations and then the bulk is written to the database. This is all done in memory. For larger amounts of operations or imports of very long files, the main memory would be unnecessarily burdened. It could come to some resource problems.

For such cases you could use streams. Unordered and ordered bulk writes can also be combined with Streams. You set the maximum size of the bulk. Once the number of bulk operations has been reached, it will be sent to the database. While streaming you can limit the memory consumption regarding the current task.

In the following example we import 1.000.000 integers into the MongoDB using the stream api:

We need to create an insert operation (BulkOps.get_insert_one()) for each number. Then we call the UnorderedBulk.stream function to import it. This function returns a stream function which accumulate all inserts operations until the limit 1000 is reached. In this case the operation group is written to MongoDB.

Example

1..1_000_000
  |> Stream.map(fn i -> BulkOps.get_insert_one(%{number: i}) end)
  |> UnorderedBulk.write(:mongo, "bulk", 1_000)
  |> Stream.run()

Benchmark

The following benchmark compares multiple Mongo.insert_one() calls with a stream using unordered bulk writes. Both tests inserts documents into a replica set with w: 1.

Benchee.run(
    %{
      "inserts" => fn input ->
       input
       |> Enum.map(fn i -> %{number: i} end)
       |> Enum.each(fn doc -> Mongo.insert_one!(top, "bulk_insert", doc) end)
      end,
      "streams" => fn input ->
                      input
                      |> Stream.map(fn i -> get_insert_one(%{number: i}) end)
                      |> Mongo.UnorderedBulk.write(top, "bulk", 1_0000)
                      |> Stream.run()
      end,
    },
    inputs: %{
      "Small" => Enum.to_list(1..10_000),
      "Medium" => Enum.to_list(1..100_000),
      "Bigger" => Enum.to_list(1..1_000_000)
    }
  )

Result:

##### With input Bigger #####
Name              ips        average  deviation         median         99th %
streams        0.0885      0.188 min     ±0.00%      0.188 min      0.188 min
inserts       0.00777       2.14 min     ±0.00%       2.14 min       2.14 min

Comparison:
streams        0.0885
inserts       0.00777 - 11.39x slower +1.96 min

##### With input Medium #####
Name              ips        average  deviation         median         99th %
streams          1.00         1.00 s     ±8.98%         0.99 s         1.12 s
inserts        0.0764        13.09 s     ±0.00%        13.09 s        13.09 s

Comparison:
streams          1.00
inserts        0.0764 - 13.12x slower +12.10 s

##### With input Small #####
Name              ips        average  deviation         median         99th %
streams          8.26        0.121 s    ±30.46%        0.112 s         0.23 s
inserts          0.75         1.34 s     ±7.15%         1.29 s         1.48 s

Comparison:
streams          8.26
inserts          0.75 - 11.07x slower +1.22 s

The result is, that using bulk writes is much faster (about 15x faster at all).

Link to this section Summary

Functions

Executes unordered and ordered bulk writes.

Link to this section Functions

Link to this function

write(topology_pid, bulk, opts \\ [])

View Source

Specs

Executes unordered and ordered bulk writes.

Unordered bulk writes

The operation are grouped (inserts, updates, deletes). The order of execution is:

  1. inserts
  2. updates
  3. deletes

The execution order within the group is not preserved.

Ordered bulk writes

Sequences of the same operations are grouped and sent as one command. The order is preserved.

If a group (inserts, updates or deletes) exceeds the limit maxWriteBatchSize it will be split into chunks. Everything is done in memory, so this use case is limited by memory. A better approach seems to use streaming bulk writes.