DataBuffer v0.1.0 DataBuffer behaviour View Source
Defines a data buffer.
A data buffer is an efficient way to maintain a local list of values associated with a given key that can later be flushed to persistent storage. In fast moving systems, this provides a scalable way keep track of values without putting heavy loads on a database.
Creating a buffer is as easy as:
defmodule Buffer do
use DataBuffer
end
Once we have defined our buffer module, we must then implement the handle_flush/2
callback that allows us to perform an operation with a provided key and list of
values.
This could mean something like dumping the data to a database.
defmodule Buffer do
use DataBuffer
def handle_flush(key, data) do
# write to the database...
# handle_flush MUST return an :ok atom
:ok
end
end
We must then add our buffer to our supervision tree.
children = [
Buffer
]
Each flush operation is handled with its own supervised Task process. By
default, a failed flush operation will retry about 3 times within 3 seconds.
Usage
With our buffer started, we can now insert values by key. A key can be any valid term.
Buffer.insert("mykey", "myval")
Key values are maintained in an ETS table. All keys are scoped to the given buffer module - so multiple buffers using the same keys will not cause issues.
With the default buffer we setup above, the "mykey" data will be flushed after 5 seconds. Assuming no new operations occur with our buffer, the process will be placed into hibernation after 10 seconds. All of this is configurable through the options below.
Options
A data buffer comes with a few configurable options. We can pass any of these options along with the use macro.
use DataBuffer, interval: 60_000, jitter: 20_000
:partitions- The number of table partitions to use for the buffer - defaults to1.:interval- The time in milliseconds between the first insert for a given key and its next flush callback being invoked. Defaults to5_000.:jitter- A max time in milliseconds that will be added tointervalto ensure some randomness in each flush invocation. The time added would be randomly selected between 0 andjitter. Defaults to0.:timeout- The time in milliseconds between the last operation on a a buffer, and the process being hibernated. Defaults to10_000.:retry_delay- The time in milliseconds between ahandle_flush/2callback failing, and the next attempt occuring. Defaults to1_000.:retry_max- The max amount of retry attempts that will occur for thehandle_flush/2callback.
Errors
If the handle_flush/2 callback returns an invalid value or raises an exception
after :retry_max attempts, the handle_error/2 callback will be invoked.
This callback is provided the same key and data values as handle_flush/2, but the
assumption can be made that our normal persistence layer is no longer functional.
It is then left up to the developer how to best handle this situation.
def handle_error(key, data) do
# Put the data back into the buffer...
# Or put the data to local disk...
:ok
end
Link to this section Summary
Callbacks
Returns the current number of items associated with a given key from the buffer.
Asynchronously flushes a given key from the buffer.
Callback for handling errors that occur during flush invocation.
Callback for flushing a key from the buffer.
Inserts a value to the buffer that is associated with a given key.
Starts the buffer process.
Link to this section Types
option()
View Source
option() ::
{:interval, non_neg_integer()}
| {:jitter, non_neg_integer()}
| {:timeout, non_neg_integer()}
| {:partitions, non_neg_integer()}
| {:retry_delay, non_neg_integer()}
| {:retry_max, non_neg_integer()}
option() ::
{:interval, non_neg_integer()}
| {:jitter, non_neg_integer()}
| {:timeout, non_neg_integer()}
| {:partitions, non_neg_integer()}
| {:retry_delay, non_neg_integer()}
| {:retry_max, non_neg_integer()}
options()
View Source
options() :: [option()]
options() :: [option()]
t()
View Source
t() :: module()
t() :: module()
Link to this section Callbacks
count(key) View Source
Returns the current number of items associated with a given key from the buffer.
flush(key)
View Source
flush(key :: any()) :: :ok
flush(key :: any()) :: :ok
Asynchronously flushes a given key from the buffer.
handle_error(key, data) View Source
Callback for handling errors that occur during flush invocation.
If the flush attempts hit the :retry_max, this function will be called and
provided with the key as well its current data. It is then up to the developer
to decide how best to handle the data.
handle_flush(key, data) View Source
Callback for flushing a key from the buffer.
When a buffer key hits its set time interval, this function will be called and provided with the key as well its current data.
This function is called within its own Task and is supervised. If the
callback does not return :ok - the task will fail and attempt retries
based on the :retry_delay and :retry_max options.
insert(key, value) View Source
Inserts a value to the buffer that is associated with a given key.
Each key is scoped to the buffer module. So duplicate keys across different buffer modules will not cause issues.
start_link(options)
View Source
start_link(options()) :: GenServer.on_start()
start_link(options()) :: GenServer.on_start()
Starts the buffer process.
Options
The options available are the same provided in the "Options" section.