View Source Klife.Producer (Klife v0.5.0)
Defines a producer.
Client configurations
:name
- Required. Producer name. Must be unique per client. Can be used as an option on the producer api:client_id
(String.t/0
) - String used on all requests for the client. If not provided the following string is used: "klife_producer.{client_name}.{producer_name}":acks
(:all
, 1) - The number of broker's acks the producer requires before considering a request complete.:all
means all ISR(in sync replicas) The default value is:all
.:linger_ms
(non_neg_integer/0
) - The maximum time to wait for additional messages before sending a batch to the broker. The default value is0
.:batch_size_bytes
(non_neg_integer/0
) - The maximum size of the batch of messages that the producer will send to the broker in a single request. The default value is512000
.:delivery_timeout_ms
(non_neg_integer/0
) - The maximum amount of time the producer will retry to deliver a message before timing out and failing the send. The default value is60000
.:request_timeout_ms
(non_neg_integer/0
) - The maximum amount of time the producer will wait for a broker response to a request before considering it as failed. The default value is15000
.:retry_backoff_ms
(non_neg_integer/0
) - The amount of time that the producer waits before retrying a failed request to the broker. The default value is1000
.:max_in_flight_requests
(non_neg_integer/0
) - The maximum number of unacknowledged requests per broker the producer will send before waiting for acknowledgments. The default value is1
.:batchers_count
(pos_integer/0
) - The number of batchers per broker the producer will start. SeeBatchers Count
session for more details. The default value is1
.:enable_idempotence
(boolean/0
) - Indicates if the producer will use kafka idempotency capabilities for exactly once semantics. The default value istrue
.:compression_type
(:none
,:gzip
or:snappy
) - The compression algorithm to be used for compressing messages before they are sent to the broker. The default value is:none
.
Interacting with producers
When configuring Klife.Client
, users can specify a list of producers to be initialized for sending records to the Kafka cluster.
Once configured, users can interact with these producers through the Klife.Client
producer API.
How many producers?
By default, all records are produced using a single default producer configured with standard settings, maximizing batch efficiency.
There are two main reasons to consider using multiple producers:
Different configurations for specific topics: Some topics may require unique settings. In this case, you can assign topics with similar configuration needs to the same producer.
Fault isolation: Each producer has an independent queue of messages. Using multiple producers can help isolate issues in one topic from affecting the performance of others.
Let's dive in an example of fault isolation. Consider this scenario:
- A producer batches messages for topics A and B into a single request to the broker leader.
- For some reason, topic B is temporarily unavailable and fails, but topic A remains functional.
- The producer is configured with in_flight_requests = 1 and delivery_timeout_ms = 1 minute.
- This means the producer will retry sending records for topic B for up to one minute.
- Because in_flight_requests is set to 1, all other records, including those for topic A, must wait
until retries for topic B are exhausted.
In this situation, issues with one topic can delay other topics handled by the same producer, as the retry mechanism occupies the single in-flight request slot.
If this scenario presents a potential issue for your use case, consider creating multiple producers and dedicating some of them to critical topics. However, note that this approach may slightly reduce batch efficiency in normal operation to improve resilience in specific failure scenarios, which may be infrequent.
Order guarantees
Kafka guarantees message order only within the same topic and partition so this is the maximum level of ordering that the producer can provide as well.
However, certain scenarios can lead to records being produced to Kafka out of order. In this context, "out of order" means the following:
1. `rec1` and `rec2` are produced to the same topic and partition.
2. The user calls produce(rec1) before calling produce(rec2).
3. Both produce calls complete successfully.
4. `rec1` is stored in the broker after `rec2`, resulting in `rec1.offset > rec2.offset`.
In this case, rec1 and rec2 are considered "out of order."
The ordering behavior for producers depends on their configuration:
- Records produced by different producers (even if targeting the same topic and partition) may be out of order since each producer operates independently and in parallel.
- Records produced by a producer with
max_in_flight_requests
> 1 andenable_idempotence
set to false may be out of order due to network failures and retries. - Any records produced by a producer with
max_in_flight_requests
= 1 have guaranteed ordering. - Any records produced by a producer with
enable_idempotence
= true have guaranteed ordering.
Dynamic batching
Klife’s producer uses dynamic batching, which automatically accumulates records that cannot be sent immediately due to in_flight_request limitations. As a result, it is rarely necessary to set linger_ms to a value greater than zero.
Typically, increasing linger_ms
can improve batching efficiency, which benefits high-throughput topics.
However, if your topic already has high throughput, dynamic batching will likely handle batching
effectively without adjusting linger_ms
.
Increasing linger_ms may be helpful only if you set a very high value for in_flight_request or if you need to limit request rates to the broker for specific reasons.
Batchers Count
Each Klife producer starts a configurable number of batchers for each broker in the Kafka cluster. Topics and partitions with the same leader are managed by the same batcher.
By default, Klife initializes only one batcher per broker, which optimizes batch efficiency but may underutilize CPU resources on high-core systems.
Consider the following setup:
- A Kafka cluster with 3 brokers
- An application running on a machine with 64 cores
batchers_count
= 1
In this scenario, the application could encounter a performance bottleneck due to having only
three batchers handling all record production requests. Increasing parallelism by adjusting batcher_count
can help resolve this issue.
For instance, increasing batcher_count from 1 to 5 would create 15 batchers (5 per broker), potentially improving parallelism and CPU utilization.
Some caveats:
- The same topic and partition are always handled by the same batcher.
- Higher batcher counts reduce batch efficiency, which may lower overall throughput.
- The ideal setting may vary depending on your workload, so it's best to measure and adjust batcher_count based on your specific performance needs.
Topic default producer
Each topic has a designated default producer, which is used by the producer client API for regular produce calls. While this default producer can be overridden with options in the client API, be mindful that doing so may affect order guarantees.
Client default producer
If a produce call is made to a topic without a predefined default producer,
the client default producer is used. The client default producer can be configured as part
of the overall Klife.Client
setup. Refer to Klife.Client
documentation for more details.