ExKpl (ExKpl v0.2.0) View Source

Elixir implementation of the Kinesis Producer Library record aggregator.

This is a port of the Erlang implementation included in adroll/erlmld

Basic usage:

iex> {_, aggregator} = ExKpl.add(ExKpl.new(), {"partition_key", "data"})
...> ExKpl.finish(aggregator)
{{"partition_key", <<243, 137, 154, 194, 10, 13, 112, 97, 114, 116, 105, 116, 105, 111, 110, 95, 107, 101, 121, 26, 8, 8, 0, 26, 4, 100, 97, 116, 97, 208, 54, 153, 218, 90, 34, 47, 163, 33, 8, 173, 27, 217, 85, 161, 78>>, nil}, %ExKpl{agg_explicit_hash_key: nil, agg_partition_key: nil, agg_size_bytes: 0, explicit_hash_keyset: %ExKpl.Keyset{key_to_index: %{}, rev_keys: []}, num_user_records: 0, partition_keyset: %ExKpl.Keyset{key_to_index: %{}, rev_keys: []}, rev_records: []}}

Typically you will use it like:

case ExKpl.add(aggregator, {partition_key, data}) do
  {nil, aggregator} ->
    aggregator

  {full_record, aggregator} ->
    send_record_to_kinesis(full_record)
    aggregator
end

You can force the current records to be aggregated with finish/1,2

Link to this section Summary

Link to this section Types

Specs

aggregated_record() :: {key(), serialized_data(), key()}

Specs

key() :: binary() | nil

Specs

new_opts() :: [{:max_bytes_per_record, pos_integer()}]

Specs

raw_data() :: binary()

Specs

serialized_data() :: binary()

Specs

t() :: %ExKpl{
  agg_explicit_hash_key: key(),
  agg_partition_key: key(),
  agg_size_bytes: non_neg_integer(),
  explicit_hash_keyset: ExKpl.Keyset.t(),
  max_bytes_per_record: pos_integer(),
  num_user_records: non_neg_integer(),
  partition_keyset: ExKpl.Keyset.t(),
  rev_records: [binary()]
}

Specs

user_record() :: {key(), raw_data(), key()}

Link to this section Functions

Specs

add(t(), {key(), binary()} | {key(), binary(), key()}) ::
  {aggregated_record() | nil, t()}

Specs

add_all(t(), [user_record()]) :: {[aggregated_record()], t()}

Specs

count(t()) :: non_neg_integer()

Specs

finish(t()) :: {aggregated_record() | nil, t()}
Link to this function

finish(agg, should_deflate?)

View Source

Specs

new(new_opts()) :: t()

Specs

size_bytes(t()) :: non_neg_integer()