ETS-based partitioned map buffer for high-throughput key-value data processing.
PartitionedBuffer.Map buffers key-value entries using :set ETS tables,
giving last-write-wins semantics for duplicate keys. It periodically processes
buffered entries using a configurable processor function. Like Queue, it
implements partitioning to reduce lock contention during high-throughput
writes, and uses double-buffering to ensure zero-downtime processing.
It also supports versioned conditional updates via put_newer/5 and
put_all_newer/3, which use "newer version wins" semantics — an entry is
only written if the key doesn't exist or the new version is greater than
the existing one.
Examples
Standalone Usage
# Start a map buffer with a custom processor
iex> {:ok, _sup_pid} =
...> PartitionedBuffer.Map.start_link(
...> name: :my_map_buffer,
...> processor: fn batch -> IO.inspect(batch) end
...> )
# Put a single entry
iex> PartitionedBuffer.Map.put(:my_map_buffer, :key1, "value1")
:ok
# Put multiple entries at once
iex> PartitionedBuffer.Map.put_all(:my_map_buffer, %{key2: "value2", key3: "value3"})
:ok
# Delete an entry
iex> PartitionedBuffer.Map.delete(:my_map_buffer, :key1)
:ok
# Versioned put (newer version wins)
iex> PartitionedBuffer.Map.put_newer(:my_map_buffer, :key4, "v1", 100)
:ok
iex> PartitionedBuffer.Map.put_newer(:my_map_buffer, :key4, "v2", 200)
:ok
iex> PartitionedBuffer.Map.get(:my_map_buffer, :key4)
"v2"
# Check buffer size
iex> PartitionedBuffer.Map.size(:my_map_buffer)
3
# Stop the buffer gracefully (processes remaining items)
iex> PartitionedBuffer.Map.stop(:my_map_buffer)
:okAdding to a Supervision Tree
children = [
{PartitionedBuffer.Map,
name: :my_map_buffer,
processor: &MyApp.EventProcessor.process_batch/1}
]
Supervisor.start_link(children, strategy: :one_for_one)Processor
The processor function receives a list of {key, value, version, updates}
tuples, where version is the entry version (set via put_newer/5 and
put_all_newer/3; 0 for regular put/4 entries), and updates is the
number of times an existing key was updated (only tracked for versioned
updates; regular put/4 entries always have updates set to 0):
fn batch ->
# batch is [{key1, value1, version1, updates1}, {key2, value2, version2, updates2}, ...]
Enum.each(batch, fn {k, v, _version, _updates} -> process(k, v) end)
endOptions
See PartitionedBuffer for
start and
runtime options.
Summary
Functions
Returns the map buffer child spec.
Deletes a key from the buffer's current write table.
Gets the value for the given key from the buffer's current write table.
Puts a single key-value entry into the buffer.
Puts multiple key-value entries into the buffer.
Puts multiple versioned key-value entries into the buffer.
Puts a single versioned key-value entry into the buffer.
Returns the map buffer size (total number of entries across all partitions).
Starts a new map buffer.
Stops a map buffer gracefully.
Updates the options for the map buffer.
Types
@type buffer() :: PartitionedBuffer.buffer()
Proxy type for a buffer
Functions
@spec child_spec(keyword()) :: Supervisor.child_spec()
Returns the map buffer child spec.
Deletes a key from the buffer's current write table.
Note: If the entry has already been handed off for processing (via double-buffering), this delete will not affect the in-flight batch.
Parameters
buffer- The buffer name (atom).key- The key to delete.opts- Optional runtime options.
Options
See PartitionedBuffer for
runtime options.
Examples
# Simple delete
delete(:my_buffer, :key1)
# With custom partition routing
delete(:my_buffer, :key1, partition_key: fn {k, _v} -> k end)
Gets the value for the given key from the buffer's current write table.
Returns default if the key is not found.
Note: This reads from the current write table only. Entries already handed off for processing will not be visible.
Parameters
buffer- The buffer name (atom).key- The key to look up.default- The default value if key is not found (defaults tonil).opts- Optional runtime options.
Options
See PartitionedBuffer for
runtime options.
Examples
# Simple get
get(:my_buffer, :key1)
# With custom partition routing
get(:my_buffer, :key1, partition_key: fn {k, _v} -> k end)
Puts a single key-value entry into the buffer.
Parameters
buffer- The buffer name (atom).key- The key for the entry.value- The value for the entry.opts- Optional runtime options.
Options
See PartitionedBuffer for
runtime options.
Examples
# Simple put
put(:my_buffer, :key1, "val1")
# With custom partition routing
put(:my_buffer, :key1, "val1", partition_key: fn {k, _v} -> k end)
Puts multiple key-value entries into the buffer.
Accepts either a map or a list of {key, value} tuples.
Parameters
buffer- The buffer name (atom).entries- A map or list of{key, value}tuples.opts- Optional runtime options.
Options
See PartitionedBuffer for
runtime options.
Examples
# Using a map
put_all(:my_buffer, %{key1: "val1", key2: "val2"})
# Using a list of tuples
put_all(:my_buffer, [{:key1, "val1"}, {:key2, "val2"}])
# With custom partition routing
put_all(:my_buffer, %{key1: "val1"}, partition_key: fn {k, _v} -> k end)
Puts multiple versioned key-value entries into the buffer.
Uses "newer version wins" semantics for each entry.
Parameters
buffer- The buffer name (atom).entries- A list of{key, value, version}tuples.opts- Optional runtime options.
Options
See PartitionedBuffer for
runtime options.
Examples
entries = [
{:user_1, %{name: "Alice"}, 100},
{:user_2, %{name: "Bob"}, 200}
]
put_all_newer(:my_buffer, entries)
Puts a single versioned key-value entry into the buffer.
Uses "newer version wins" semantics: the entry is only written if:
- The key doesn't exist, or
- The new version is greater than the existing version
This is useful for scenarios where you want to ensure only the latest version of data is stored, such as event sourcing or state synchronization.
Parameters
buffer- The buffer name (atom).key- The key for the entry.value- The value for the entry.version- The version (must be comparable with>).opts- Optional runtime options.
Options
See PartitionedBuffer for
runtime options.
Examples
# Using timestamps as versions
put_newer(:my_buffer, :user_123, %{name: "Alice"}, System.monotonic_time())
# Using sequence numbers
put_newer(:my_buffer, :counter, 42, 5)
@spec size(buffer()) :: non_neg_integer()
Returns the map buffer size (total number of entries across all partitions).
Examples
size(:my_buffer)
@spec start_link(keyword()) :: Supervisor.on_start()
Starts a new map buffer.
Options
See PartitionedBuffer for
start options.
Examples
PartitionedBuffer.Map.start_link(name: :my_map_buffer)
Stops a map buffer gracefully.
Examples
PartitionedBuffer.Map.stop(:my_map_buffer)
Updates the options for the map buffer.
Options
See PartitionedBuffer.update_options/2 for the updatable options.
Examples
# Update the processing interval to 100ms
update_options(:my_buffer, processing_interval_ms: 100)