GenBatcher behaviour (GenBatcher v1.1.0)

View Source

A GenBatcher is a process that maintains a collection of items and performs a user-defined flush operation on those items once an item-based condition is met or a timeout is exceeded.

Summary

Types

Information about a GenBatcher partition.

A routing key used for selecting a GenBatcher partition.

t()

An identifier for a GenBatcher process.

Callbacks

The function invoked to flush a GenBatcher partition.

The function invoked when an item is inserted into a GenBatcher partition.

The function invoked to generate the initial accumulator term/0.

Functions

Dumps the contents of all partitions for the given GenBatcher to a nested list, bypassing flush operations and refreshing the partitions.

Dumps the contents of the given GenBatcher partition to a list, bypassing a flush operation and refreshing the partition.

Flushes all partitions for the given GenBatcher.

Returns information for all partitions for the given GenBatcher.

Inserts an item into the given GenBatcher.

Inserts multiple items into the given GenBatcher and returns the number of items inserted.

Returns information for the given GenBatcher partition.

Starts a GenBatcher process linked to the current process.

Starts a GenBatcher process linked to the current process.

Types

partition_info()

@type partition_info() :: GenBatcher.Partition.Info.t()

Information about a GenBatcher partition.

For additional details, see GenBatcher.Partition.Info.

partition_key()

@type partition_key() :: term()

A routing key used for selecting a GenBatcher partition.

For additional details, see PartitionSupervisor.

t()

@type t() :: PartitionSupervisor.name()

An identifier for a GenBatcher process.

Callbacks

handle_flush(items, info)

@callback handle_flush(items :: list(), info :: partition_info()) :: term()

The function invoked to flush a GenBatcher partition.

The first argument is a list/0 of inserted items and the second argument is a partition_info/0 struct.

This callback can return any term/0 as the return value is disregarded.

This callback is required.

handle_insert(item, acc)

(optional)
@callback handle_insert(item :: term(), acc :: term()) :: {:cont, term()} | :flush

The function invoked when an item is inserted into a GenBatcher partition.

The first argument is the inserted term/0 and the second argument is the accumulator term/0.

This callback should return an updated accumulator term/0 wrapped in a :cont tuple to continue collecting items or :flush to trigger a flush.

This callback is optional and the default implementation always returns {:cont, acc} where acc is the accumulator term/0.

initial_acc()

(optional)
@callback initial_acc() :: term()

The function invoked to generate the initial accumulator term/0.

The initial accumulator is refreshed with this callback after every flush operation.

This callback is optional and the default implementation simply returns nil.

Functions

dump(gen_batcher, opts \\ [])

@spec dump(
  t(),
  keyword()
) :: [list()]

Dumps the contents of all partitions for the given GenBatcher to a nested list, bypassing flush operations and refreshing the partitions.

The partitions' contents are returned in index order.

While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.

Options

A GenBatcher can be dumped with the following options:

  • :timeout - An optional timeout/0 (in ms) for this operation. Defaults to :infinity.

dump_partition(gen_batcher, partition_key, opts \\ [])

@spec dump_partition(t(), partition_key(), keyword()) :: list()

Dumps the contents of the given GenBatcher partition to a list, bypassing a flush operation and refreshing the partition.

While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.

Options

A GenBatcher partition can be dumped with the following options:

  • :timeout - An optional timeout/0 (in ms) for this operation. Defaults to :infinity.

flush(gen_batcher, opts \\ [])

@spec flush(
  t(),
  keyword()
) :: :ok

Flushes all partitions for the given GenBatcher.

While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.

Options

A GenBatcher can be flushed with the following options:

  • :async? - An optional boolean/0 denoting whether or not a partition should be flushed asynchronously. Defaults to true.

  • :concurrent? - An optional boolean/0 denoting whether or not partitions should flush concurrently. Only relevant if :async? is false and the given GenBatcher has more than 1 partition. Defaults to true.

  • :timeout - An optional timeout/0 (in ms) for this operation. Defaults to :infinity.

flush_partition(gen_batcher, partition_key, opts \\ [])

@spec flush_partition(t(), partition_key(), keyword()) :: :ok

Flushes the given GenBatcher partition.

While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.

Options

A GenBatcher partition can be flushed with the following options:

  • :async? - An optional boolean/0 denoting whether or not the partition should be flushed asynchronously. Defaults to true.

  • :timeout - An optional timeout/0 (in ms) for this operation. Defaults to :infinity.

info(gen_batcher, opts \\ [])

@spec info(
  t(),
  keyword()
) :: [partition_info()]

Returns information for all partitions for the given GenBatcher.

The partition_info/0 structs are returned in index order.

While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.

Options

Information about a GenBatcher can be retrieved with the following options:

  • :timeout - An optional timeout/0 (in ms) for this operation. Defaults to :infinity.

insert(gen_batcher, item, opts \\ [])

@spec insert(t(), term(), keyword()) :: :ok

Inserts an item into the given GenBatcher.

Warning

When inserting multiple items into an GenBatcher, using insert_all/3 is far more efficient than inserting each item individually.

Options

An item can be inserted into a GenBatcher with the following options:

  • :partition_key - An optional partition_key/0 denoting which partition to insert the item into. If not specified, the partition is decided by the round-robin partitioner.

  • :timeout - An optional timeout/0 (in ms) for this operation. Defaults to :infinity.

insert_all(gen_batcher, items, opts \\ [])

@spec insert_all(t(), Enumerable.t(), keyword()) :: non_neg_integer()

Inserts multiple items into the given GenBatcher and returns the number of items inserted.

All of the given items are inserted into the same partition.

Options

Items can be inserted into a GenBatcher with the following options:

  • :partition_key - An optional partition_key/0 denoting which partition to insert the items into. If not specified, the partition is decided by the round-robin partitioner.

  • :safe? - An optional boolean/0 denoting whether or not to flush immediately after a flush condition is met. If true and a flush condition is met, a flush is triggered before inserting the remaining items. Otherwise, a flush is triggered once all items have been inserted. This can improve throughput at the cost of allowing otherwise impossible batches. Only relevant if a flush would be triggered before all items are inserted. Defaults to true.

  • :timeout - An optional timeout/0 (in ms) for this operation. Defaults to :infinity.

partition_info(gen_batcher, partition_key, opts \\ [])

@spec partition_info(t(), partition_key(), keyword()) :: partition_info()

Returns information for the given GenBatcher partition.

While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.

Options

Information about a GenBatcher partition can be retrieved with the following options:

  • :timeout - An optional timeout/0 (in ms) for this operation. Defaults to :infinity.

start_link(arg)

@spec start_link(module() | keyword()) :: Supervisor.on_start()

Starts a GenBatcher process linked to the current process.

For more information, see start_link/2.

start_link(module, opts)

@spec start_link(
  module() | nil,
  keyword()
) :: Supervisor.on_start()

Starts a GenBatcher process linked to the current process.

A GenBatcher can be started with a module/0 that implements the GenBatcher behaviour, an opts keyword/0, or a combination of the two.

Options

A GenBatcher can be started with the following options:

  • :batch_timeout - An optional timeout/0 denoting the maximum amount of time (in ms) allowed between flushes. Defaults to :infinity.

  • :blocking_flush? - An optional boolean/0 denoting whether or not a flush should block a partition process. This can be useful for applying backpressure. Defaults to false.

  • :flush_empty? - An optional boolean/0 denoting whether or not an empty partition should be flushed. Defaults to false.

  • :flush_meta - An optional term/0 to be included in partition_info/0 structs. Defaults to nil.

  • :flush_trigger - An optional specification for an item-based flush condition that can take any of the following forms:

    • {:size, max_size} where max_size is a pos_integer/0 denoting the maximum number of items allowed in a partition before a flush is triggered.

    • {:static_custom, init_acc, handle_insert} where init_acc is a term/0 used as the inital accumulator and handle_insert is a function invoked when an item is inserted into a partition. For more information, see handle_insert/2.

    • {:dynamic_custom, init_acc_fun, handle_insert} where init_acc_fun is a function invoked to generate an initial accumulator and handle_insert is a function invoked when an item is inserted into a partition. For more information about these functions, see initial_acc/0 and handle_insert/2 respectively.

    If a module/0 is provided and :flush_trigger is not explicitly specified, {:dynamic_custom, init_acc_fun, handle_insert} will be used where init_acc_fun and handle_insert are that module's initial_acc/0 and handle_insert/2 implementations respectively.

    If :flush_trigger is not specified and a module/0 is not provided, the GenBatcher will not have an item-based flush condition (meaning that inserting items will never trigger a flush).

  • :handle_flush - A function invoked to flush a partition. For more information, see handle_flush/2. When a t:module is provided, this field will be overridden by that module's handle_flush/2 implementation. If a module/0 is not provided, :handle_flush is required.

  • :name - An optional identifier for a GenBatcher. For more information, see t/0. If a module/0 is provided, that module's name will be used unless otherwise specified. Defaults to GenBatcher.

  • :ordering - An optional ordering scheme (:fifo or :lifo) for buffered items. Defaults to :fifo. If the ordering of items is unimportant for a flush operation, :lifo is slightly more efficient.

  • :partitions - An optional pos_integer/0 denoting the number of partitions. Defaults to 1. For more information, see Partitioning.

  • :shutdown - An optional timeout/0 denoting the maximum amount of time (in ms) to allow flushes to finish on shutdown or :brutal_kill if flushes should be stopped immediately. Defaults to :infinity. For more information, see Shutdown.