View Source Klife.Client behaviour (Klife v0.5.0)
Defines a kafka client.
To use it you must do 3 steps:
- Use it in a module
- Config the module on your config file
- Start the module on your supervision tree
Using it in a module
When used it expects an :otp_app option that is the OTP application that has the client configuration.
defmodule MyApp.MyClient do
use Klife.Client, otp_app: :my_app
end
use Klife.ClientWhen you
use Klife.Client, it will extend your module in two ways:
Define it as a proxy to a subset of the functions on
Klifemodule, using it's module's name as theclient_nameparameter. One example of this is theMyClient.produce/2that forwards both arguments toKlife.produce/3and injectMyClientas the second argument.Define it as a supervisor by calling
use Supervisorand implementing some related functions such asstart_link/1andinit/1, so it can be started under on your app supervision tree.
Configuration
The client has a bunch of configuration options, you can read more below. But it will look somehting like this:
config :my_app, MyApp.MyClient,
connection: [
bootstrap_servers: ["localhost:19092", "localhost:29092"],
ssl: false
],
producers: [
[
name: :my_custom_producer,
linger_ms: 5,
max_in_flight_requests: 10
]
],
topics: [
[name: "my_topic_0", producer: :my_custom_producer]
]
You can see more configuration examples on the "Client configuration examples" section
or an working application example on the example folder on the project's repository.
Configuration options:
:connection(non-emptykeyword/0) - Required.:bootstrap_servers(list ofString.t/0) - Required. List of servers to establish the initial connection. (eg: ["localhost:9092", "localhost:9093"]):ssl(boolean/0) - Specify the underlying socket module. Use:sslif true and:gen_tcpif false. The default value isfalse.:connect_opts(list ofterm/0) - Options used to configure the socket connection, which are forwarded to theconnect/3function of the underlying socket module (see ssl option above.). The default value is[inet_backend: :socket, active: false].:socket_opts(list ofterm/0) - Options used to configure the open socket, which are forwarded to thesetopts/2function of the underlying socket module:inetfor:gen_tcpand:sslfor:ssl(see ssl option above.). The default value is[keepalive: true].:sasl_opts(list ofterm/0) - Options to configure SASL authentication, see SASL section for supported mechanisms and examples. The default value is[].
:default_producer(atom/0) - Name of the producer to be used on produce API calls when a specific producer is not provided via configuration or option. If not provided a default producer will be started automatically. The default value is:klife_default_producer.:default_partitioner(atom/0) - Partitioner module to be used on produce API calls when a specific partitioner is not provided via configuration or option. The default value isKlife.Producer.DefaultPartitioner.:default_txn_pool(atom/0) - Name of the txn pool to be used on transactions when a:pool_nameis not provided as an option. If not provided a default txn pool will be started automatically. The default value is:klife_default_txn_pool.:txn_pools(List ofKlife.TxnProducerPoolconfigurations) - List of configurations, each starting a pool of transactional producers for use with transactional api. The default value is[].:producers(List ofKlife.Producerconfigurations) - List of configurations, each starting a new producer for use with produce api. The default value is[].:topics(List ofKlife.Topicconfigurations) - List of topics that may have special configurations The default value is[].:disabled_features(List atoms representing a features to disable.) -:producerdisable producer feature.:txn_producerdisables transactions. The default value is[].
Starting it
Finally, it must be started on your application. It will look something like this:
defmodule MyApp.Application do
def start(_type, _args) do
children = [
# some other modules...,
MyApp.MyClient
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endProducer API overview
In order to interact with the producer API you will work with Klife.Record module
as your main input and output data structure.
Usually you will give an record to some producer API function and it will return an enriched record with some new attributes based on what happened.
So in summary the interaction goes like this:
- Build one or more
Klife.Record - Pass it to some producer API function
- Receive an enriched version of the provided records
rec = %Klife.Record{value: "some_val", topic: "my_topic_1"}
{:ok, %Klife.Record{offset: offset, partition: partition}} = MyClient.produce(rec)
Summary
Producer API
Produce a single record.
Produce a single record asynchronoulsy.
Produce a batch of records.
Produce a batch of records asynchronoulsy.
Transaction API
Transactionally produce a batch of records.
Runs the given function inside a transaction.
Producer API
Produce a single record.
It expects a Klife.Record struct containg at least :value and :topic and returns
an ok/error tuple along side with the enriched version of the input record as described
in "Producer API Overview".
Options
:producer(atom/0) - Producer's name that will override thedefault_producerconfiguration. Ignored inside transactions.:partitioner(atom/0) - Module that will overridedefault_partitionerconfiguration.
Examples
iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> {:ok, %Klife.Record{} = enriched_rec} = MyClient.produce(rec)
iex> true = is_number(enriched_rec.offset)
iex> true = is_number(enriched_rec.partition)
Produce a single record asynchronoulsy.
The same as produce/2 but returns immediately. Accepts a callback
option to execute arbitrary code after response is obtained.
Semantics and guarantees
This functions executes the callback using
Task.start/1. Therefore there is no guarantees about record delivery or callback execution.
Options
:producer(atom/0) - Producer's name that will override thedefault_producerconfiguration. Ignored inside transactions.:partitioner(atom/0) - Module that will overridedefault_partitionerconfiguration.:callback(term/0) - MFA or function/1 that will be called with the produce result. The result is injected as the first argument on MFA and is the only argument for anonymous functions
Examples
Anonymous Function:
iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> callback = fn resp ->
...> {:ok, enriched_rec} = resp
...> true = is_number(enriched_rec.offset)
...> true = is_number(enriched_rec.partition)
...> end
iex> :ok = MyClient.produce_async(rec, callback: callback)
Using MFA:
iex> defmodule CB do
...> def exec(resp, my_arg1, my_arg2) do
...> "my_arg1" = my_arg1
...> "my_arg2" = my_arg2
...> {:ok, enriched_rec} = resp
...> true = is_number(enriched_rec.offset)
...> true = is_number(enriched_rec.partition)
...> end
...> end
iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> :ok = MyClient.produce_async(rec, callback: {CB, :exec, ["my_arg1", "my_arg2"]})
@callback produce_batch(list_of_records(), opts :: Keyword.t()) :: [ {:ok | :error, record()} ]
Produce a batch of records.
It expects a list of Klife.Record structs containg at least :value and :topic and returns
a list of ok/error tuples along side with the enriched version of the input record as described
in "Producer API Overview".
The order of the response tuples on the returning list is the same as the input list. That means the first response tuple will be related to the first record on the input and so on.
Semantics and guarantees
This functions is semantically equivalent to call
produce/2multiple times and wait for all responses. Which means that 2 records sent on the same batch may succeed or fail independently.In other words that is no atomicity guarentees. If you need it see
produce_batch_txn/2.The input list may contain records related to any topic/partition, for records of the same topic/partition the order between them is guaranteed to be the same of the input, for records of different topic/partition no order is guaranteed between them.
See the partial error example below for more cotext.
Options
:producer(atom/0) - Producer's name that will override thedefault_producerconfiguration. Ignored inside transactions.:partitioner(atom/0) - Module that will overridedefault_partitionerconfiguration.
Examples
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = MyClient.produce_batch([rec1, rec2, rec3])Partial error example:
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: :rand.bytes(2_000_000), topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> [{:ok, _resp1}, {:error, %Klife.Record{error_code: 10}}, {:ok, _resp3}] = MyClient.produce_batch([rec1, rec2, rec3])In order to facilitate the response handling you can use Klife.Record.verify_batch/1 or
Klife.Record.verify_batch!/1 functions.
Produce a batch of records asynchronoulsy.
The same as produce_batch/2 but returns immediately. Accepts a callback
option to execute arbitrary code after response is obtained
Semantics and guarantees
When callback is provided this functions is implemented as
Task.start/1callingproduce_batch/2and executing the callback right after. Therefore there is no guarantees about record delivery or callback execution.
Beware of process limits
Because this function spawns a new process for every new call with a callback defined it may lead to a high number of processes to be spawned if it is executed inside loops.
In order to avoid this you can increase the batch size you are calling it or increase the system's process limit erlang flag.
Options
:producer(atom/0) - Producer's name that will override thedefault_producerconfiguration. Ignored inside transactions.:partitioner(atom/0) - Module that will overridedefault_partitionerconfiguration.:callback(term/0) - MFA or function/1 that will be called with the produce result. The result is injected as the first argument on MFA and is the only argument for anonymous functions
Examples
Anonymous Function:
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> :ok = MyClient.produce_batch_async(input, callback: fn resp ->
...> [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = resp
...> end)
Using MFA:
iex> defmodule CB2 do
...> def exec(resp, my_arg1, my_arg2) do
...> "arg1" = my_arg1
...> "arg2" = my_arg2
...> [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = resp
...> end
...> end
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> :ok = MyClient.produce_batch_async(input, callback: {CB2, :exec, ["arg1", "arg2"]})
Transaction API
@callback produce_batch_txn(list_of_records(), opts :: Keyword.t()) :: {:ok, list_of_records()} | {:error, list_of_records()}
Transactionally produce a batch of records.
It expects a list of Klife.Record structs containg at least :value and :topic and returns
a tuple ok/error tuple along side with the enriched version of the input records as described
in "Producer API Overview".
The order of the response tuples on the returning list is the same as the input list. That means the first response tuple will be related to the first record on the input and so on.
Beware of performance costs
Each
produce_batch_txn/2will have 2 extra network roundtrips to the broker than a non transactionalproduce_batch/2. One for adding topic/partitions to the transaction and other to commit or abort it.
Options
:pool_name(atom/0) - Txn pool's name that will override thedefault_txn_poolconfiguration.
Examples
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> {:ok, [_resp1, _resp2, _resp3]} = MyClient.produce_batch_txn([rec1, rec2, rec3])
Runs the given function inside a transaction.
Every produce API call made inside the given function will be part of a transaction
that will only commit if the returning value of fun is :ok or {:ok, _any}, any
other return value will abort all records produced inside the given function.
Beware of performance costs
Each produce call inside the input function may have 1 extra network roundtrip to the broker than a normal non transactional call.
At the end of the transaction another round trip is needed in order to commit or abort the transaction.
Produce semantics inside transaction
All produce API calls keeps the same semantics as they have outside a transaction. This means that records produced using
produce_batch/2may still succeed/fail independently and aproduce/2call may still fail. Therefore it is user's responsability to verify and abort the transaction if needed.
Options
:pool_name(atom/0) - Txn pool's name that will override thedefault_txn_poolconfiguration.
Examples
iex> {:ok, [_resp1, _resp2, _resp3]} = MyClient.transaction(fn ->
...> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
...> {:ok, resp1} = MyClient.produce(rec1)
...> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
...> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
...> [resp2, resp3] = MyClient.produce_batch([rec2, rec3])
...> {:ok, [resp1, resp2, resp3]}
...> end)
Types
@type list_of_records() :: [record()]
@type record() :: Klife.Record.t()