View Source ExBuffer behaviour (ExBuffer v0.5.0)
An ExBuffer is a process that maintains a collection of items and flushes
them once certain conditions have been met.
An ExBuffer can flush based on a timeout, a maximum length (item count), a
maximum byte size, or a combination of the three. When multiple conditions are
used, the ExBuffer will flush when the first condition is met.
ExBuffer also includes a number of helpful tools for testing and debugging.
Summary
Functions
Dumps the contents of the given ExBuffer to a list, bypassing a flush
callback and resetting the buffer.
Flushes the given ExBuffer, regardless of whether or not the flush conditions
have been met.
Returns information about the given ExBuffer.
Starts an ExBuffer process linked to the current process.
Callbacks
Invoked to flush an ExBuffer.
The first argument (data) is a list of items inserted into the ExBuffer and the
second argument (opts) is a keyword list of flush options. See the :flush_callback
and :flush_meta options for ExBuffer.start_link/2 for more information.
This callback can return any term as the return value is disregarded by the ExBuffer.
This callback is required.
@callback handle_size(item :: term()) :: non_neg_integer()
Invoked to determine the size of an inserted item.
The only argument (item) is any term that was inserted into the ExBuffer.
This callback must return a non-negative integer representing the item's byte size.
This callback is optional. See the :size_callback option for ExBuffer.start_link/2
for information about the default implementation.
Functions
@spec dump( PartitionSupervisor.name(), keyword() ) :: list()
Dumps the contents of the given ExBuffer to a list, bypassing a flush
callback and resetting the buffer.
While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.
Options
An ExBuffer can be dumped with the following options:
:partition- A non-negative integer representing the specific partition index to dump. By default, this function dumps all partitions and concatenates the results together. (Optional)
Examples
iex> ExBuffer.insert(ExBuffer, "foo")
iex> ExBuffer.insert(ExBuffer, "bar")
iex> ExBuffer.dump(ExBuffer)
["foo", "bar"]
iex> ExBuffer.insert(ExBuffer, "foo")
iex> ExBuffer.insert(ExBuffer, "bar")
iex> ExBuffer.dump(ExBuffer, partition: 0)
["foo"]
@spec flush( GenServer.server(), keyword() ) :: :ok
Flushes the given ExBuffer, regardless of whether or not the flush conditions
have been met.
While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.
Options
An ExBuffer can be flushed with the following options:
:mode- A value denoting whether the flush will be synchronous or asynchronous. Possible values are ':syncand:async. By default, this value is:async. (Optional) *:partition` - A non-negative integer representing the specific partition index to flush. By default, this function flushes all partitions. (Optional) ## Example iex> ExBuffer.insert(ExBuffer, "foo") iex> ExBuffer.insert(ExBuffer, "bar") ...> ...> # Invokes flush callback on ["foo"] and then on ["bar"] iex> ExBuffer.flush(ExBuffer) :ok iex> ExBuffer.insert(ExBuffer, "foo") iex> ExBuffer.insert(ExBuffer, "bar") ...> ...> # Invokes flush callback on ["foo"] iex> ExBuffer.flush(ExBuffer, partition: 0) :ok
@spec info( GenServer.server(), keyword() ) :: [map()]
Returns information about the given ExBuffer.
This function returns a map per partition with the following keys:
:length- The number of items in theExBufferpartition.:max_length- The maximum length of theExBufferpartition after applying the:jitter_rate.:max_size- the maximum byte-size of theExBufferpartition after applying the:jitter_rate.:next_flush- The amount of time (in ms) until the next scheduled flush of theExBufferpartition (ornilif theExBufferwas started without a time limit).:partition- The index of theExBufferpartition.:size- The byte-size of theExBufferpartition.:timeout- The maximum amount of time (in ms) allowed between flushes of theExBufferpartition after applying the:jitter_rate.
While this functionality may occasionally be desriable in a production environment, it is intended to be used primarily for testing and debugging.
Options
The information about an ExBuffer can be retrieved with the following options:
:partition- A non-negative integer representing the specific partition index to retrieve information for. By default, this function retrieves information for all partitions. (Optional)
Examples
iex> ExBuffer.insert(ExBuffer, "foo")
iex> [%{length: length}, %{}] = ExBuffer.info(ExBuffer)
iex> length
1
iex> ExBuffer.insert(ExBuffer, "foo")
iex> [%{length: length, partition: 0}] = ExBuffer.info(ExBuffer, partition: 0)
iex> length
1
@spec insert(GenServer.server(), term()) :: :ok
Inserts the given item into the given ExBuffer based on the partitioner that the
given ExBuffer was started with.
Example
iex> ExBuffer.insert(ExBuffer, "foo")
:ok
@spec insert_batch(GenServer.server(), Enumerable.t(), keyword()) :: non_neg_integer()
Inserts the given batch of items into the given ExBuffer based on the partitioner that
the given ExBuffer was started with. This function returns the number of items that were
inserted.
All items in the batch are inserted into the same partition.
Tip
When inserting multiple items into an
ExBuffer, this function will be far more performant than callingExBuffer.insert/2for each one. As such, whenever items become available in batches, this function should be preferred.
Options
A batch of items can be inserted into an ExBuffer with the following options:
:flush_mode- A value denoting whether how buffer will be flushed (if applicable). Possible values are:syncand:async. By default, this value is:sync, meaning that, if a flush condition is met while inserting items, theExBufferpartition will synchronously flush before continuing to insert items. If this value is set to:async, all items will be inserted before checking if any flush conditions have been met. Afterwards, if a flush condition has been met, theExBufferpartition will be flushed asynchronously. (Optional)
Example
iex> ExBuffer.insert_batch(ExBuffer, ["foo", "bar", "baz"])
3
@spec start_link( module() | nil, keyword() ) :: Supervisor.on_start()
Starts an ExBuffer process linked to the current process.
The first argument (module) is optional. It is intended to be used when calling
this function from a module that implements the ExBuffer behaviour. When a module
is passed, it may interact with the options that were passed in:
If the module implements the
handle_flush/2callback, it will override the:flush_callbackoption.If the module implements the
handle_size/1callback, it will override the:size_callbackoption.If a
:nameoption is not present, the module name will be used.
Options
An ExBuffer can be started with the following options:
:flush_callback- The function that will be invoked to handle a flush. This function should expect two parameters: a list of items and a keyword list of flush opts. The flush opts include the size and length of the buffer at the time of the flush, the partition index of the flushed buffer, and any provided metadata (see:flush_metafor more information). This function can return any term as the return value is disregarded by theExBuffer. (Required):buffer_timeout- A non-negative integer representing the maximum time (in ms) allowed between flushes of theExBuffer. Once this amount of time has passed, theExBufferwill be flushed. By default, anExBufferdoes not have a timeout. (Optional):flush_meta- A term to be included in the flush opts under themetakey. By default, this value will benil. (Optional):jitter_rate- A float between 0 and 1 that is used to offset the limits ofExBufferpartitions. Limits are decreased by a random rate between 0 and this value. By default, no jitter is applied to anExBuffer. (Optional):max_length- A non-negative integer representing the maximum allowed length (item count) of theExBuffer. Once the limit is hit, theExBufferwill be flushed. By default, anExBufferdoes not have a max length. (Optional):max_size- A non-negative integer representing the maximum allowed size (in bytes) of theExBuffer. Once the limit is hit (or exceeded), theExBufferwill be flushed. The:size_callbackoption determines how item size is computed. By default, anExBufferdoes not have a max size. (Optional):name- The registered name for theExBuffer. This must be either an atom or a:viatuple. By default (when an implementation module is not used), the name of anExBufferisExBuffer. (Optional):partitioner- The strategy for assigning items to a partition. The partitioner can be either:rotatingor:random. The former assigns items to partitions in a round-robin fashion and the latter assigns items randomly. By default, anExBufferuses a:rotatingpartitioner. (Optional):partitions- The number of partitions for theExBuffer. By default, anExBufferhas 1 partition. (Optional):size_callback- The function that will be invoked to determine the size of an item. This function should expect a single parameter representing an item and should return a single non-negative integer representing that item's byte size. The defaultExBuffersize callback isKernel.byte_size/1(:erlang.term_to_binary/1is used to convert non-bitstring inputs to binary if necessary). (Optional)
Additionally, an ExBuffer can also be started with any GenServer options.