GenBatcher
View SourceGenBatcher is a simple and lightweight batching utility for Elixir.
Installation
This package can be installed by adding :gen_batcher to your list of
dependencies in mix.exs:
def deps do
[
{:gen_batcher "~> 1.1.0"}
]
endDocumentation
For additional documentation, see HexDocs.
Usage
GenBatcher processes are easy to start and have a number of powerful
configuration options:
Child Spec Example
A GenBatcher can be started with a simple child spec:
opts = [
name: :my_gen_batcher,
flush_trigger: {:size, 3},
batch_timeout: 30_000,
handle_flush:
fn items, _ ->
items
|> Enum.join(",")
|> IO.puts()
end
]
children = [
{GenBatcher, opts}
]
Supervisor.start_link(children, strategy: :one_for_one)Once started, items can be inserted into the GenBatcher:
GenBatcher.insert(:my_gen_batcher, "foo")
GenBatcher.insert(:my_gen_batcher, "bar")And, once a flush condition has been met, a flush operation will be triggered:
GenBatcher.insert(:my_gen_batcher, "baz")
# Flush operation outputs "foo,bar,baz"
GenBatcher.insert(:my_gen_batcher, "foo")
# After 30 seconds pass...
# Flush operation outputs "foo"Behaviour Example
A GenBatcher can also be started with any module that implements the
GenBatcher behaviour:
defmodule MyGenBatcher do
use GenBatcher
def start_link(opts \\ []) do
GenBatcher.start_link(__MODULE__, opts)
end
@impl GenBatcher
def handle_flush(items, _) do
items
|> Enum.join(",")
|> IO.puts()
end
@impl GenBatcher
def handle_insert(item, acc) do
size = acc + byte_size(item)
if size >= 9, do: :flush, else: {:cont, size}
end
@impl GenBatcher
def initial_acc, do: 0
endAgain, items can be inserted into the GenBatcher once it starts:
GenBatcher.insert(MyGenBatcher, "foo")
GenBatcher.insert(MyGenBatcher, "bar")And, again, a flush operation will be triggered once a flush condition is met:
GenBatcher.insert(MyGenBatcher, "baz")
# Flush operation outputs "foo,bar,baz"Flushing
By default, flush operations are asynchronous to both the caller and the
GenBatcher partition. In practice, this is achieved by making use of the
Task.Supervisor module. However, this can be configured with the
:blocking_flush? option for GenBatcher.start_link/2. If set to true, the
partition process will perform the flush operation instead of delegating to a
Task. This can be useful for applying backpressure and ensuring the system
isn't completely flooded.
A GenBatcher with an extremely cheap flush operation might see a higher
throughput when utilizing blocking flushes but this is generally not the case.
Complex Flush Triggers
In general, a size condition and/or timeout condition is sufficient for most use
cases. However, GenBatcher also supports defining custom item-based flush
triggers. For example, these callbacks can be used to
trigger a flush based on byte size.
In cases where an item-based flush trigger is temporarily delayed (see
GenBatcher.insert_all/3), the GenBatcher.handle_insert/2 callback will not
be called again until after a flush operation is triggered. This means that the
accumulator term is guaranteed to be in a valid state whenever this callback is
invoked.
Partitioning
GenBatcher leverages Elixir's PartitionSupervisor in order to support
partitioning. All of a GenBatcher's partitions collect items and flush
independently.
By default, GenBatcher uses a round-robin partitioner when inserting items.
However, the partitioner can be overridden with the :partition_key option for
GenBatcher.insert/3 and GenBatcher.insert_all/3, allowing for custom
partitioning strategies.
All of a GenBatcher's partitions utilize the same flush conditions. This can
occasionally lead to bursts of flush operations being triggered at around the
same time. The GenBatcher.initial_acc/0 callback can be leveraged to
"jitter" item-based flush triggers in order to "desync" flush operations and
mitigate this issue. For example, the GenBatcher below enforces an absolute
maximum size of 1,000 items but randomly assigns each partition a maximum size
between 901 and 1,000 items:
defmodule MyJitteredGenBatcher do
use GenBatcher
def start_link(opts \\ []) do
opts = Keyword.put(opts, :partitions, 5)
GenBatcher.start_link(__MODULE__, opts)
end
@impl GenBatcher
def handle_flush(items, _) do
items
|> Enum.join(",")
|> IO.puts()
end
@impl GenBatcher
def handle_insert(_, 1), do: :flush
def handle_insert(_, acc), do: {:cont, acc - 1}
@impl GenBatcher
def initial_acc, do: 900 + :rand.uniform(100)
endShutdown
As long as a GenBatcher is shutdown gracefully, it's guaranteed to flush all
inserted items. The last flush operation for each partition is always performed
by the partition itself, regardless of the :blocking_flush? option provided to
GenBatcher.start_link/2.
By default, non-blocking flushes are given unlimited time to complete during
shutdown to ensure that data is not lost. However, if this behavior is not
desirable and data loss is acceptable, this can be managed with the :shutdown
option for GenBatcher.start_link/2.
ExBuffer
GenBatcher was created as a conceptual fork of the now-archived
ExBuffer package and is intended
to supersede it.