View Source Klife.Client behaviour (Klife v0.5.0)
Defines a kafka client.
To use it you must do 3 steps:
- Use it in a module
- Config the module on your config file
- Start the module on your supervision tree
Using it in a module
When used it expects an :otp_app
option that is the OTP application that has the client configuration.
defmodule MyApp.MyClient do
use Klife.Client, otp_app: :my_app
end
use Klife.Client
When you
use Klife.Client
, it will extend your module in two ways:
Define it as a proxy to a subset of the functions on
Klife
module, using it's module's name as theclient_name
parameter. One example of this is theMyClient.produce/2
that forwards both arguments toKlife.produce/3
and injectMyClient
as the second argument.Define it as a supervisor by calling
use Supervisor
and implementing some related functions such asstart_link/1
andinit/1
, so it can be started under on your app supervision tree.
Configuration
The client has a bunch of configuration options, you can read more below. But it will look somehting like this:
config :my_app, MyApp.MyClient,
connection: [
bootstrap_servers: ["localhost:19092", "localhost:29092"],
ssl: false
],
producers: [
[
name: :my_custom_producer,
linger_ms: 5,
max_in_flight_requests: 10
]
],
topics: [
[name: "my_topic_0", producer: :my_custom_producer]
]
You can see more configuration examples on the "Client configuration examples" section
or an working application example on the example
folder on the project's repository.
Configuration options:
:connection
(non-emptykeyword/0
) - Required.:bootstrap_servers
(list ofString.t/0
) - Required. List of servers to establish the initial connection. (eg: ["localhost:9092", "localhost:9093"]):ssl
(boolean/0
) - Specify the underlying socket module. Use:ssl
if true and:gen_tcp
if false. The default value isfalse
.:connect_opts
(list ofterm/0
) - Options used to configure the socket connection, which are forwarded to theconnect/3
function of the underlying socket module (see ssl option above.). The default value is[inet_backend: :socket, active: false]
.:socket_opts
(list ofterm/0
) - Options used to configure the open socket, which are forwarded to thesetopts/2
function of the underlying socket module:inet
for:gen_tcp
and:ssl
for:ssl
(see ssl option above.). The default value is[keepalive: true]
.:sasl_opts
(list ofterm/0
) - Options to configure SASL authentication, see SASL section for supported mechanisms and examples. The default value is[]
.
:default_producer
(atom/0
) - Name of the producer to be used on produce API calls when a specific producer is not provided via configuration or option. If not provided a default producer will be started automatically. The default value is:klife_default_producer
.:default_partitioner
(atom/0
) - Partitioner module to be used on produce API calls when a specific partitioner is not provided via configuration or option. The default value isKlife.Producer.DefaultPartitioner
.:default_txn_pool
(atom/0
) - Name of the txn pool to be used on transactions when a:pool_name
is not provided as an option. If not provided a default txn pool will be started automatically. The default value is:klife_default_txn_pool
.:txn_pools
(List ofKlife.TxnProducerPool
configurations) - List of configurations, each starting a pool of transactional producers for use with transactional api. The default value is[]
.:producers
(List ofKlife.Producer
configurations) - List of configurations, each starting a new producer for use with produce api. The default value is[]
.:topics
(List ofKlife.Topic
configurations) - List of topics that may have special configurations The default value is[]
.:disabled_features
(List atoms representing a features to disable.) -:producer
disable producer feature.:txn_producer
disables transactions. The default value is[]
.
Starting it
Finally, it must be started on your application. It will look something like this:
defmodule MyApp.Application do
def start(_type, _args) do
children = [
# some other modules...,
MyApp.MyClient
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
Producer API overview
In order to interact with the producer API you will work with Klife.Record
module
as your main input and output data structure.
Usually you will give an record to some producer API function and it will return an enriched record with some new attributes based on what happened.
So in summary the interaction goes like this:
- Build one or more
Klife.Record
- Pass it to some producer API function
- Receive an enriched version of the provided records
rec = %Klife.Record{value: "some_val", topic: "my_topic_1"}
{:ok, %Klife.Record{offset: offset, partition: partition}} = MyClient.produce(rec)
Summary
Producer API
Produce a single record.
Produce a single record asynchronoulsy.
Produce a batch of records.
Produce a batch of records asynchronoulsy.
Transaction API
Transactionally produce a batch of records.
Runs the given function inside a transaction.
Producer API
Produce a single record.
It expects a Klife.Record
struct containg at least :value
and :topic
and returns
an ok/error tuple along side with the enriched version of the input record as described
in "Producer API Overview".
Options
:producer
(atom/0
) - Producer's name that will override thedefault_producer
configuration. Ignored inside transactions.:partitioner
(atom/0
) - Module that will overridedefault_partitioner
configuration.
Examples
iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> {:ok, %Klife.Record{} = enriched_rec} = MyClient.produce(rec)
iex> true = is_number(enriched_rec.offset)
iex> true = is_number(enriched_rec.partition)
Produce a single record asynchronoulsy.
The same as produce/2
but returns immediately. Accepts a callback
option to execute arbitrary code after response is obtained.
Semantics and guarantees
This functions executes the callback using
Task.start/1
. Therefore there is no guarantees about record delivery or callback execution.
Options
:producer
(atom/0
) - Producer's name that will override thedefault_producer
configuration. Ignored inside transactions.:partitioner
(atom/0
) - Module that will overridedefault_partitioner
configuration.:callback
(term/0
) - MFA or function/1 that will be called with the produce result. The result is injected as the first argument on MFA and is the only argument for anonymous functions
Examples
Anonymous Function:
iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> callback = fn resp ->
...> {:ok, enriched_rec} = resp
...> true = is_number(enriched_rec.offset)
...> true = is_number(enriched_rec.partition)
...> end
iex> :ok = MyClient.produce_async(rec, callback: callback)
Using MFA:
iex> defmodule CB do
...> def exec(resp, my_arg1, my_arg2) do
...> "my_arg1" = my_arg1
...> "my_arg2" = my_arg2
...> {:ok, enriched_rec} = resp
...> true = is_number(enriched_rec.offset)
...> true = is_number(enriched_rec.partition)
...> end
...> end
iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> :ok = MyClient.produce_async(rec, callback: {CB, :exec, ["my_arg1", "my_arg2"]})
@callback produce_batch(list_of_records(), opts :: Keyword.t()) :: [ {:ok | :error, record()} ]
Produce a batch of records.
It expects a list of Klife.Record
structs containg at least :value
and :topic
and returns
a list of ok/error tuples along side with the enriched version of the input record as described
in "Producer API Overview".
The order of the response tuples on the returning list is the same as the input list. That means the first response tuple will be related to the first record on the input and so on.
Semantics and guarantees
This functions is semantically equivalent to call
produce/2
multiple times and wait for all responses. Which means that 2 records sent on the same batch may succeed or fail independently.In other words that is no atomicity guarentees. If you need it see
produce_batch_txn/2
.The input list may contain records related to any topic/partition, for records of the same topic/partition the order between them is guaranteed to be the same of the input, for records of different topic/partition no order is guaranteed between them.
See the partial error example below for more cotext.
Options
:producer
(atom/0
) - Producer's name that will override thedefault_producer
configuration. Ignored inside transactions.:partitioner
(atom/0
) - Module that will overridedefault_partitioner
configuration.
Examples
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = MyClient.produce_batch([rec1, rec2, rec3])
Partial error example:
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: :rand.bytes(2_000_000), topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> [{:ok, _resp1}, {:error, %Klife.Record{error_code: 10}}, {:ok, _resp3}] = MyClient.produce_batch([rec1, rec2, rec3])
In order to facilitate the response handling you can use Klife.Record.verify_batch/1
or
Klife.Record.verify_batch!/1
functions.
Produce a batch of records asynchronoulsy.
The same as produce_batch/2
but returns immediately. Accepts a callback
option to execute arbitrary code after response is obtained
Semantics and guarantees
When callback is provided this functions is implemented as
Task.start/1
callingproduce_batch/2
and executing the callback right after. Therefore there is no guarantees about record delivery or callback execution.
Beware of process limits
Because this function spawns a new process for every new call with a callback defined it may lead to a high number of processes to be spawned if it is executed inside loops.
In order to avoid this you can increase the batch size you are calling it or increase the system's process limit erlang flag.
Options
:producer
(atom/0
) - Producer's name that will override thedefault_producer
configuration. Ignored inside transactions.:partitioner
(atom/0
) - Module that will overridedefault_partitioner
configuration.:callback
(term/0
) - MFA or function/1 that will be called with the produce result. The result is injected as the first argument on MFA and is the only argument for anonymous functions
Examples
Anonymous Function:
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> :ok = MyClient.produce_batch_async(input, callback: fn resp ->
...> [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = resp
...> end)
Using MFA:
iex> defmodule CB2 do
...> def exec(resp, my_arg1, my_arg2) do
...> "arg1" = my_arg1
...> "arg2" = my_arg2
...> [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = resp
...> end
...> end
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> :ok = MyClient.produce_batch_async(input, callback: {CB2, :exec, ["arg1", "arg2"]})
Transaction API
@callback produce_batch_txn(list_of_records(), opts :: Keyword.t()) :: {:ok, list_of_records()} | {:error, list_of_records()}
Transactionally produce a batch of records.
It expects a list of Klife.Record
structs containg at least :value
and :topic
and returns
a tuple ok/error tuple along side with the enriched version of the input records as described
in "Producer API Overview".
The order of the response tuples on the returning list is the same as the input list. That means the first response tuple will be related to the first record on the input and so on.
Beware of performance costs
Each
produce_batch_txn/2
will have 2 extra network roundtrips to the broker than a non transactionalproduce_batch/2
. One for adding topic/partitions to the transaction and other to commit or abort it.
Options
:pool_name
(atom/0
) - Txn pool's name that will override thedefault_txn_pool
configuration.
Examples
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> {:ok, [_resp1, _resp2, _resp3]} = MyClient.produce_batch_txn([rec1, rec2, rec3])
Runs the given function inside a transaction.
Every produce API call made inside the given function will be part of a transaction
that will only commit if the returning value of fun is :ok
or {:ok, _any}
, any
other return value will abort all records produced inside the given function.
Beware of performance costs
Each produce call inside the input function may have 1 extra network roundtrip to the broker than a normal non transactional call.
At the end of the transaction another round trip is needed in order to commit or abort the transaction.
Produce semantics inside transaction
All produce API calls keeps the same semantics as they have outside a transaction. This means that records produced using
produce_batch/2
may still succeed/fail independently and aproduce/2
call may still fail. Therefore it is user's responsability to verify and abort the transaction if needed.
Options
:pool_name
(atom/0
) - Txn pool's name that will override thedefault_txn_pool
configuration.
Examples
iex> {:ok, [_resp1, _resp2, _resp3]} = MyClient.transaction(fn ->
...> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
...> {:ok, resp1} = MyClient.produce(rec1)
...> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
...> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
...> [resp2, resp3] = MyClient.produce_batch([rec2, rec3])
...> {:ok, [resp1, resp2, resp3]}
...> end)
Types
@type list_of_records() :: [record()]
@type record() :: Klife.Record.t()