GenBatcher behaviour (GenBatcher v1.1.0)
View SourceA GenBatcher is a process that maintains a collection of items and performs
a user-defined flush operation on those items once an item-based condition is
met or a timeout is exceeded.
Summary
Types
Information about a GenBatcher partition.
A routing key used for selecting a GenBatcher partition.
An identifier for a GenBatcher process.
Callbacks
The function invoked to flush a GenBatcher partition.
The function invoked when an item is inserted into a GenBatcher partition.
The function invoked to generate the initial accumulator term/0.
Functions
Dumps the contents of all partitions for the given GenBatcher to a nested
list, bypassing flush operations and refreshing the partitions.
Dumps the contents of the given GenBatcher partition to a list, bypassing a
flush operation and refreshing the partition.
Flushes all partitions for the given GenBatcher.
Flushes the given GenBatcher partition.
Returns information for all partitions for the given GenBatcher.
Inserts an item into the given GenBatcher.
Inserts multiple items into the given GenBatcher and returns the number of
items inserted.
Returns information for the given GenBatcher partition.
Starts a GenBatcher process linked to the current process.
Starts a GenBatcher process linked to the current process.
Types
@type partition_info() :: GenBatcher.Partition.Info.t()
Information about a GenBatcher partition.
For additional details, see GenBatcher.Partition.Info.
@type partition_key() :: term()
A routing key used for selecting a GenBatcher partition.
For additional details, see PartitionSupervisor.
@type t() :: PartitionSupervisor.name()
An identifier for a GenBatcher process.
Callbacks
@callback handle_flush(items :: list(), info :: partition_info()) :: term()
The function invoked to flush a GenBatcher partition.
The first argument is a list/0 of inserted items and the second argument
is a partition_info/0 struct.
This callback can return any term/0 as the return value is disregarded.
This callback is required.
The function invoked when an item is inserted into a GenBatcher partition.
The first argument is the inserted term/0 and the second argument is the
accumulator term/0.
This callback should return an updated accumulator term/0 wrapped in a
:cont tuple to continue collecting items or :flush to trigger a flush.
This callback is optional and the default implementation always returns
{:cont, acc} where acc is the accumulator term/0.
@callback initial_acc() :: term()
The function invoked to generate the initial accumulator term/0.
The initial accumulator is refreshed with this callback after every flush operation.
This callback is optional and the default implementation simply returns nil.
Functions
Dumps the contents of all partitions for the given GenBatcher to a nested
list, bypassing flush operations and refreshing the partitions.
The partitions' contents are returned in index order.
While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.
Options
A GenBatcher can be dumped with the following options:
:timeout- An optionaltimeout/0(in ms) for this operation. Defaults to:infinity.
@spec dump_partition(t(), partition_key(), keyword()) :: list()
Dumps the contents of the given GenBatcher partition to a list, bypassing a
flush operation and refreshing the partition.
While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.
Options
A GenBatcher partition can be dumped with the following options:
:timeout- An optionaltimeout/0(in ms) for this operation. Defaults to:infinity.
Flushes all partitions for the given GenBatcher.
While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.
Options
A GenBatcher can be flushed with the following options:
:async?- An optionalboolean/0denoting whether or not a partition should be flushed asynchronously. Defaults totrue.:concurrent?- An optionalboolean/0denoting whether or not partitions should flush concurrently. Only relevant if:async?isfalseand the givenGenBatcherhas more than 1 partition. Defaults totrue.:timeout- An optionaltimeout/0(in ms) for this operation. Defaults to:infinity.
@spec flush_partition(t(), partition_key(), keyword()) :: :ok
Flushes the given GenBatcher partition.
While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.
Options
A GenBatcher partition can be flushed with the following options:
@spec info( t(), keyword() ) :: [partition_info()]
Returns information for all partitions for the given GenBatcher.
The partition_info/0 structs are returned in index order.
While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.
Options
Information about a GenBatcher can be retrieved with the following options:
:timeout- An optionaltimeout/0(in ms) for this operation. Defaults to:infinity.
Inserts an item into the given GenBatcher.
Warning
When inserting multiple items into an GenBatcher, using insert_all/3 is
far more efficient than inserting each item individually.
Options
An item can be inserted into a GenBatcher with the following options:
:partition_key- An optionalpartition_key/0denoting which partition to insert the item into. If not specified, the partition is decided by the round-robin partitioner.:timeout- An optionaltimeout/0(in ms) for this operation. Defaults to:infinity.
@spec insert_all(t(), Enumerable.t(), keyword()) :: non_neg_integer()
Inserts multiple items into the given GenBatcher and returns the number of
items inserted.
All of the given items are inserted into the same partition.
Options
Items can be inserted into a GenBatcher with the following options:
:partition_key- An optionalpartition_key/0denoting which partition to insert the items into. If not specified, the partition is decided by the round-robin partitioner.:safe?- An optionalboolean/0denoting whether or not to flush immediately after a flush condition is met. Iftrueand a flush condition is met, a flush is triggered before inserting the remaining items. Otherwise, a flush is triggered once all items have been inserted. This can improve throughput at the cost of allowing otherwise impossible batches. Only relevant if a flush would be triggered before all items are inserted. Defaults totrue.:timeout- An optionaltimeout/0(in ms) for this operation. Defaults to:infinity.
@spec partition_info(t(), partition_key(), keyword()) :: partition_info()
Returns information for the given GenBatcher partition.
While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.
Options
Information about a GenBatcher partition can be retrieved with the following
options:
:timeout- An optionaltimeout/0(in ms) for this operation. Defaults to:infinity.
@spec start_link(module() | keyword()) :: Supervisor.on_start()
Starts a GenBatcher process linked to the current process.
For more information, see start_link/2.
@spec start_link( module() | nil, keyword() ) :: Supervisor.on_start()
Starts a GenBatcher process linked to the current process.
A GenBatcher can be started with a module/0 that implements the
GenBatcher behaviour, an opts keyword/0, or a combination of the two.
Options
A GenBatcher can be started with the following options:
:batch_timeout- An optionaltimeout/0denoting the maximum amount of time (in ms) allowed between flushes. Defaults to:infinity.:blocking_flush?- An optionalboolean/0denoting whether or not a flush should block a partition process. This can be useful for applying backpressure. Defaults tofalse.:flush_empty?- An optionalboolean/0denoting whether or not an empty partition should be flushed. Defaults tofalse.:flush_meta- An optionalterm/0to be included inpartition_info/0structs. Defaults tonil.:flush_trigger- An optional specification for an item-based flush condition that can take any of the following forms:{:size, max_size}wheremax_sizeis apos_integer/0denoting the maximum number of items allowed in a partition before a flush is triggered.{:static_custom, init_acc, handle_insert}whereinit_accis aterm/0used as the inital accumulator andhandle_insertis a function invoked when an item is inserted into a partition. For more information, seehandle_insert/2.{:dynamic_custom, init_acc_fun, handle_insert}whereinit_acc_funis a function invoked to generate an initial accumulator andhandle_insertis a function invoked when an item is inserted into a partition. For more information about these functions, seeinitial_acc/0andhandle_insert/2respectively.
If a
module/0is provided and:flush_triggeris not explicitly specified,{:dynamic_custom, init_acc_fun, handle_insert}will be used whereinit_acc_funandhandle_insertare that module'sinitial_acc/0andhandle_insert/2implementations respectively.If
:flush_triggeris not specified and amodule/0is not provided, theGenBatcherwill not have an item-based flush condition (meaning that inserting items will never trigger a flush).:handle_flush- A function invoked to flush a partition. For more information, seehandle_flush/2. When at:moduleis provided, this field will be overridden by that module'shandle_flush/2implementation. If amodule/0is not provided,:handle_flushis required.:name- An optional identifier for aGenBatcher. For more information, seet/0. If amodule/0is provided, that module's name will be used unless otherwise specified. Defaults toGenBatcher.:ordering- An optional ordering scheme (:fifoor:lifo) for buffered items. Defaults to:fifo. If the ordering of items is unimportant for a flush operation,:lifois slightly more efficient.:partitions- An optionalpos_integer/0denoting the number of partitions. Defaults to1. For more information, see Partitioning.:shutdown- An optionaltimeout/0denoting the maximum amount of time (in ms) to allow flushes to finish on shutdown or:brutal_killif flushes should be stopped immediately. Defaults to:infinity. For more information, see Shutdown.