View Source Klife.Client behaviour (Klife v0.5.0)

Defines a kafka client.

To use it you must do 3 steps:

  • Use it in a module
  • Config the module on your config file
  • Start the module on your supervision tree

Using it in a module

When used it expects an :otp_app option that is the OTP application that has the client configuration.

defmodule MyApp.MyClient do
  use Klife.Client, otp_app: :my_app
end

use Klife.Client

When you use Klife.Client, it will extend your module in two ways:

  • Define it as a proxy to a subset of the functions on Klife module, using it's module's name as the client_name parameter. One example of this is the MyClient.produce/2 that forwards both arguments to Klife.produce/3 and inject MyClient as the second argument.

  • Define it as a supervisor by calling use Supervisor and implementing some related functions such as start_link/1 and init/1, so it can be started under on your app supervision tree.

Configuration

The client has a bunch of configuration options, you can read more below. But it will look somehting like this:

config :my_app, MyApp.MyClient,
  connection: [
    bootstrap_servers: ["localhost:19092", "localhost:29092"],
    ssl: false
  ],
  producers: [
    [
      name: :my_custom_producer,
      linger_ms: 5,
      max_in_flight_requests: 10
    ]
  ],
  topics: [
    [name: "my_topic_0", producer: :my_custom_producer]
  ]

You can see more configuration examples on the "Client configuration examples" section or an working application example on the example folder on the project's repository.

Configuration options:

  • :connection (non-empty keyword/0) - Required.

    • :bootstrap_servers (list of String.t/0) - Required. List of servers to establish the initial connection. (eg: ["localhost:9092", "localhost:9093"])

    • :ssl (boolean/0) - Specify the underlying socket module. Use :ssl if true and :gen_tcp if false. The default value is false.

    • :connect_opts (list of term/0) - Options used to configure the socket connection, which are forwarded to the connect/3 function of the underlying socket module (see ssl option above.). The default value is [inet_backend: :socket, active: false].

    • :socket_opts (list of term/0) - Options used to configure the open socket, which are forwarded to the setopts/2 function of the underlying socket module :inet for :gen_tcp and :ssl for :ssl (see ssl option above.). The default value is [keepalive: true].

    • :sasl_opts (list of term/0) - Options to configure SASL authentication, see SASL section for supported mechanisms and examples. The default value is [].

  • :default_producer (atom/0) - Name of the producer to be used on produce API calls when a specific producer is not provided via configuration or option. If not provided a default producer will be started automatically. The default value is :klife_default_producer.

  • :default_partitioner (atom/0) - Partitioner module to be used on produce API calls when a specific partitioner is not provided via configuration or option. The default value is Klife.Producer.DefaultPartitioner.

  • :default_txn_pool (atom/0) - Name of the txn pool to be used on transactions when a :pool_name is not provided as an option. If not provided a default txn pool will be started automatically. The default value is :klife_default_txn_pool.

  • :txn_pools (List of Klife.TxnProducerPool configurations) - List of configurations, each starting a pool of transactional producers for use with transactional api. The default value is [].

  • :producers (List of Klife.Producer configurations) - List of configurations, each starting a new producer for use with produce api. The default value is [].

  • :topics (List of Klife.Topic configurations) - List of topics that may have special configurations The default value is [].

  • :disabled_features (List atoms representing a features to disable.) - :producer disable producer feature. :txn_producer disables transactions. The default value is [].

Starting it

Finally, it must be started on your application. It will look something like this:

defmodule MyApp.Application do
  def start(_type, _args) do
    children = [
      # some other modules...,
      MyApp.MyClient
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Producer API overview

In order to interact with the producer API you will work with Klife.Record module as your main input and output data structure.

Usually you will give an record to some producer API function and it will return an enriched record with some new attributes based on what happened.

So in summary the interaction goes like this:

  • Build one or more Klife.Record
  • Pass it to some producer API function
  • Receive an enriched version of the provided records
rec = %Klife.Record{value: "some_val", topic: "my_topic_1"}
{:ok, %Klife.Record{offset: offset, partition: partition}} = MyClient.produce(rec)

Summary

Producer API

Produce a single record.

Produce a single record asynchronoulsy.

Produce a batch of records.

Produce a batch of records asynchronoulsy.

Transaction API

Transactionally produce a batch of records.

Runs the given function inside a transaction.

Producer API

@callback produce(record(), opts :: Keyword.t()) :: {:ok, record()} | {:error, record()}

Produce a single record.

It expects a Klife.Record struct containg at least :value and :topic and returns an ok/error tuple along side with the enriched version of the input record as described in "Producer API Overview".

Options

  • :producer (atom/0) - Producer's name that will override the default_producer configuration. Ignored inside transactions.

  • :partitioner (atom/0) - Module that will override default_partitioner configuration.

Examples

iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> {:ok, %Klife.Record{} = enriched_rec} = MyClient.produce(rec)
iex> true = is_number(enriched_rec.offset)
iex> true = is_number(enriched_rec.partition)
Link to this callback

produce_async(record, opts)

View Source
@callback produce_async(record(), opts :: Keyword.t()) :: :ok

Produce a single record asynchronoulsy.

The same as produce/2 but returns immediately. Accepts a callback option to execute arbitrary code after response is obtained.

Semantics and guarantees

This functions executes the callback using Task.start/1. Therefore there is no guarantees about record delivery or callback execution.

Options

  • :producer (atom/0) - Producer's name that will override the default_producer configuration. Ignored inside transactions.

  • :partitioner (atom/0) - Module that will override default_partitioner configuration.

  • :callback (term/0) - MFA or function/1 that will be called with the produce result. The result is injected as the first argument on MFA and is the only argument for anonymous functions

Examples

Anonymous Function:

iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> callback = fn resp ->
...>    {:ok, enriched_rec} = resp
...>    true = is_number(enriched_rec.offset)
...>    true = is_number(enriched_rec.partition)
...> end
iex> :ok = MyClient.produce_async(rec, callback: callback)

Using MFA:

iex> defmodule CB do
...>    def exec(resp, my_arg1, my_arg2) do
...>      "my_arg1" = my_arg1
...>      "my_arg2" = my_arg2
...>      {:ok, enriched_rec} = resp
...>      true = is_number(enriched_rec.offset)
...>      true = is_number(enriched_rec.partition)
...>    end
...> end
iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> :ok = MyClient.produce_async(rec, callback: {CB, :exec, ["my_arg1", "my_arg2"]})
Link to this callback

produce_batch(list_of_records, opts)

View Source
@callback produce_batch(list_of_records(), opts :: Keyword.t()) :: [
  {:ok | :error, record()}
]

Produce a batch of records.

It expects a list of Klife.Record structs containg at least :value and :topic and returns a list of ok/error tuples along side with the enriched version of the input record as described in "Producer API Overview".

The order of the response tuples on the returning list is the same as the input list. That means the first response tuple will be related to the first record on the input and so on.

Semantics and guarantees

This functions is semantically equivalent to call produce/2 multiple times and wait for all responses. Which means that 2 records sent on the same batch may succeed or fail independently.

In other words that is no atomicity guarentees. If you need it see produce_batch_txn/2.

The input list may contain records related to any topic/partition, for records of the same topic/partition the order between them is guaranteed to be the same of the input, for records of different topic/partition no order is guaranteed between them.

See the partial error example below for more cotext.

Options

  • :producer (atom/0) - Producer's name that will override the default_producer configuration. Ignored inside transactions.

  • :partitioner (atom/0) - Module that will override default_partitioner configuration.

Examples

iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = MyClient.produce_batch([rec1, rec2, rec3])

Partial error example:

iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: :rand.bytes(2_000_000), topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> [{:ok, _resp1}, {:error, %Klife.Record{error_code: 10}}, {:ok, _resp3}] = MyClient.produce_batch([rec1, rec2, rec3])

In order to facilitate the response handling you can use Klife.Record.verify_batch/1 or Klife.Record.verify_batch!/1 functions.

Link to this callback

produce_batch_async(record, opts)

View Source
@callback produce_batch_async(record(), opts :: Keyword.t()) :: :ok

Produce a batch of records asynchronoulsy.

The same as produce_batch/2 but returns immediately. Accepts a callback option to execute arbitrary code after response is obtained

Semantics and guarantees

When callback is provided this functions is implemented as Task.start/1 calling produce_batch/2 and executing the callback right after. Therefore there is no guarantees about record delivery or callback execution.

Beware of process limits

Because this function spawns a new process for every new call with a callback defined it may lead to a high number of processes to be spawned if it is executed inside loops.

In order to avoid this you can increase the batch size you are calling it or increase the system's process limit erlang flag.

Options

  • :producer (atom/0) - Producer's name that will override the default_producer configuration. Ignored inside transactions.

  • :partitioner (atom/0) - Module that will override default_partitioner configuration.

  • :callback (term/0) - MFA or function/1 that will be called with the produce result. The result is injected as the first argument on MFA and is the only argument for anonymous functions

Examples

Anonymous Function:

iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> :ok = MyClient.produce_batch_async(input, callback: fn resp ->
...>  [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = resp
...> end)

Using MFA:

iex> defmodule CB2 do
...>    def exec(resp, my_arg1, my_arg2) do
...>      "arg1" = my_arg1
...>      "arg2" = my_arg2
...>       [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = resp
...>    end
...> end
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> :ok = MyClient.produce_batch_async(input, callback: {CB2, :exec, ["arg1", "arg2"]})

Transaction API

Link to this callback

produce_batch_txn(list_of_records, opts)

View Source
@callback produce_batch_txn(list_of_records(), opts :: Keyword.t()) ::
  {:ok, list_of_records()} | {:error, list_of_records()}

Transactionally produce a batch of records.

It expects a list of Klife.Record structs containg at least :value and :topic and returns a tuple ok/error tuple along side with the enriched version of the input records as described in "Producer API Overview".

The order of the response tuples on the returning list is the same as the input list. That means the first response tuple will be related to the first record on the input and so on.

Beware of performance costs

Each produce_batch_txn/2 will have 2 extra network roundtrips to the broker than a non transactional produce_batch/2. One for adding topic/partitions to the transaction and other to commit or abort it.

Options

  • :pool_name (atom/0) - Txn pool's name that will override the default_txn_pool configuration.

Examples

iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> {:ok, [_resp1, _resp2, _resp3]} = MyClient.produce_batch_txn([rec1, rec2, rec3])
@callback transaction(fun :: function(), opts :: Keyword.t()) :: any()

Runs the given function inside a transaction.

Every produce API call made inside the given function will be part of a transaction that will only commit if the returning value of fun is :ok or {:ok, _any}, any other return value will abort all records produced inside the given function.

Beware of performance costs

Each produce call inside the input function may have 1 extra network roundtrip to the broker than a normal non transactional call.

At the end of the transaction another round trip is needed in order to commit or abort the transaction.

Produce semantics inside transaction

All produce API calls keeps the same semantics as they have outside a transaction. This means that records produced using produce_batch/2 may still succeed/fail independently and a produce/2 call may still fail. Therefore it is user's responsability to verify and abort the transaction if needed.

Options

  • :pool_name (atom/0) - Txn pool's name that will override the default_txn_pool configuration.

Examples

iex> {:ok, [_resp1, _resp2, _resp3]} = MyClient.transaction(fn ->
...>  rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
...>  {:ok, resp1} = MyClient.produce(rec1)
...>  rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
...>  rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
...>  [resp2, resp3] = MyClient.produce_batch([rec2, rec3])
...>  {:ok, [resp1, resp2, resp3]}
...> end)

Types

@type list_of_records() :: [record()]
@type record() :: Klife.Record.t()