View Source Mongo.BulkWrite (mongodb-driver v1.4.1)
The driver supports the so-called bulk writes (Specification):
The motivation for bulk writes lies in the possibility of optimizing to group the same operations. The driver supports
- unordered and ordered bulk writes
- in-memory and stream bulk writes
Unordered bulk writes
Unordered bulk writes have the highest optimization factor. Here all operations can be divided into three groups (inserts, updates and deletes). The order of execution within a group does not matter. However, the groups are executed in the order: inserts, updates and deletes. The following example creates three records, changes them, and then deletes all records. After execution, the collection is unchanged. It's valid, because of the execution order:
- inserts
- updates
- deletes
Example:
alias Mongo.BulkWrite
alias Mongo.UnorderedBulk
bulk = "bulk"
|> UnorderedBulk.new()
|> UnorderedBulk.insert_one(%{name: "Greta"})
|> UnorderedBulk.insert_one(%{name: "Tom"})
|> UnorderedBulk.insert_one(%{name: "Waldo"})
|> UnorderedBulk.update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
|> UnorderedBulk.update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
|> UnorderedBulk.update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
|> UnorderedBulk.delete_one(%{kind: "dog"})
|> UnorderedBulk.delete_one(%{kind: "dog"})
|> UnorderedBulk.delete_one(%{kind: "dog"})
result = BulkWrite.write(:mongo, bulk, w: 1)
Ordered bulk writes
Sometimes the order of execution is important for successive operations to yield a correct result.
In this case, one uses ordered bulk writes. The following example would not work with unordered bulk writes
because the order within the update operations is undefined. The update_many()
will only work, if it is
executed after the update_one()
functions.
bulk = "bulk"
|> OrderedBulk.new()
|> OrderedBulk.insert_one(%{name: "Greta"})
|> OrderedBulk.insert_one(%{name: "Tom"})
|> OrderedBulk.insert_one(%{name: "Waldo"})
|> OrderedBulk.update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
|> OrderedBulk.update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
|> OrderedBulk.update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
|> OrderedBulk.update_many(%{kind: "dog"}, %{"$set": %{kind: "cat"}})
|> OrderedBulk.delete_one(%{kind: "cat"})
|> OrderedBulk.delete_one(%{kind: "cat"})
|> OrderedBulk.delete_one(%{kind: "cat"})
result = BulkWrite.write(:mongo, bulk, w: 1)
Stream bulk writes
The examples shown initially filled the bulk with a few operations and then the bulk is written to the database. This is all done in memory. For larger amounts of operations or imports of very long files, the main memory would be unnecessarily burdened. It could come to some resource problems.
For such cases you could use streams. Unordered and ordered bulk writes can also be combined with Streams. You set the maximum size of the bulk. Once the number of bulk operations has been reached, it will be sent to the database. While streaming you can limit the memory consumption regarding the current task.
In the following example we import 1.000.000 integers into the MongoDB using the stream api:
We need to create an insert operation (BulkOps.get_insert_one()
) for each number. Then we call the UnorderedBulk.stream
function to import it. This function returns a stream function which accumulate
all inserts operations until the limit 1000
is reached. In this case the operation group is written to
MongoDB.
Example
1..1_000_000
|> Stream.map(fn i -> BulkOps.get_insert_one(%{number: i}) end)
|> UnorderedBulk.write(:mongo, "bulk", 1_000)
|> Stream.run()
Benchmark
The following benchmark compares multiple Mongo.insert_one()
calls with a stream using unordered bulk writes.
Both tests inserts documents into a replica set with w: 1
.
Benchee.run(
%{
"inserts" => fn input ->
input
|> Enum.map(fn i -> %{number: i} end)
|> Enum.each(fn doc -> Mongo.insert_one!(top, "bulk_insert", doc) end)
end,
"streams" => fn input ->
input
|> Stream.map(fn i -> get_insert_one(%{number: i}) end)
|> Mongo.UnorderedBulk.write(top, "bulk", 1_0000)
|> Stream.run()
end,
},
inputs: %{
"Small" => Enum.to_list(1..10_000),
"Medium" => Enum.to_list(1..100_000),
"Bigger" => Enum.to_list(1..1_000_000)
}
)
Result:
##### With input Bigger #####
Name ips average deviation median 99th %
streams 0.0885 0.188 min ±0.00% 0.188 min 0.188 min
inserts 0.00777 2.14 min ±0.00% 2.14 min 2.14 min
Comparison:
streams 0.0885
inserts 0.00777 - 11.39x slower +1.96 min
##### With input Medium #####
Name ips average deviation median 99th %
streams 1.00 1.00 s ±8.98% 0.99 s 1.12 s
inserts 0.0764 13.09 s ±0.00% 13.09 s 13.09 s
Comparison:
streams 1.00
inserts 0.0764 - 13.12x slower +12.10 s
##### With input Small #####
Name ips average deviation median 99th %
streams 8.26 0.121 s ±30.46% 0.112 s 0.23 s
inserts 0.75 1.34 s ±7.15% 1.29 s 1.48 s
Comparison:
streams 8.26
inserts 0.75 - 11.07x slower +1.22 s
The result is, that using bulk writes is much faster (about 15x faster at all).
Summary
Functions
Executes unordered and ordered bulk writes.
Functions
@spec write( GenServer.server(), Mongo.UnorderedBulk.t() | Mongo.OrderedBulk.t(), Keyword.t() ) :: Mongo.BulkWriteResult.t()
Executes unordered and ordered bulk writes.
Unordered bulk writes
The operation are grouped (inserts, updates, deletes). The order of execution is:
- inserts
- updates
- deletes
The execution order within the group is not preserved.
Ordered bulk writes
Sequences of the same operations are grouped and sent as one command. The order is preserved.
If a group (inserts, updates or deletes) exceeds the limit maxWriteBatchSize
it will be split into chunks.
Everything is done in memory, so this use case is limited by memory. A better approach seems to use streaming bulk writes.