Klife.Consumer.Fetcher (Klife v1.0.0)
View SourceDefines a fetcher.
A fetcher is responsible for batching and sending fetch requests to Kafka brokers. Klife groups all topic-partitions that share the same broker leader into the same fetcher batcher, so most requests are sent as a single batched TCP call per broker, maximizing network efficiency.
Fetchers are configured as part of Klife.Client setup. Most users interact with fetchers
indirectly through a Klife.Consumer.ConsumerGroup, which manages offset tracking, rebalancing,
and dispatching under the hood. The consumer group creates or shares fetchers according to its
:fetch_strategy configuration, see the Klife.Consumer.ConsumerGroup docs for details.
For advanced use cases that require full control over offset management and partition assignment
(i.e. standalone consumers), fetchers can also be used directly through the Klife.Client fetch API.
This gives you the same building blocks that the consumer group uses internally.
Client configurations
:name- Required. Fetcher name. Must be unique per client. Can be passed as an option for consumers:client_id(String.t/0) - String used on all requests. If not provided the following string is used: "klife_fetcher.{client_name}.{fetcher_name}":linger_ms(non_neg_integer/0) - The maximum time to wait for additional record requests from consumers before sending a batch to the broker. The default value is0.:max_bytes_per_request(non_neg_integer/0) - The maximum amount of bytes to be returned in a single fetch request. The default value is5000000.:max_in_flight_requests(non_neg_integer/0) - The maximum number of fetch requests per batcher the fetcher will send before waiting for responses. The default value is3.:batchers_count(pos_integer/0) - The number of batchers per broker the fetcher will start. Defaults toceil(schedulers_online / known_brokers_count):request_timeout_ms(non_neg_integer/0) - The maximum amount of time the fetcher will wait for a broker response to a request before considering it as failed. The default value is30000.:max_wait_ms(non_neg_integer/0) - Define for how long the broker may wait (for more records) before send a response to the client The default value is0.
How many fetchers?
By default, all fetch requests use a single default fetcher configured with standard settings, maximizing batch efficiency.
There are two main reasons to consider using multiple fetchers:
Different configurations for specific topics: Some topics may benefit from unique settings. You can assign topics with similar needs to the same fetcher.
Fault isolation: Each fetcher has an independent request pipeline. Using multiple fetchers can prevent issues in one topic from affecting fetch latency of others.
Consider this scenario:
- A fetcher batches fetch requests for topics A and B into a single request to the broker leader.
- For some reason, topic B is temporarily unavailable and the broker responds slowly.
- Because both topics share the same batcher pipeline, fetch responses for topic A are delayed
waiting on topic B's slow response.If this presents a potential issue for your use case, consider creating multiple fetchers and dedicating some of them to critical topics. However, note that this may slightly reduce batch efficiency in normal operation.
Batchers count
Each fetcher starts a configurable number of batchers for each broker in the Kafka cluster. Topic-partitions with the same leader are managed by the same batcher.
Unlike producers, fetchers default batchers_count to ceil(schedulers_online / known_brokers_count)
rather than 1. This is because fetch responses are typically large and deserialization happens inside
the batcher, a single batcher per broker can easily become a CPU bottleneck.
You may still want to tune this value:
- Lower values improve batch efficiency at the cost of parallelism.
- Higher values improve parallelism but may reduce batch efficiency.
Dynamic batching
Like the producer, the fetcher uses dynamic batching. Fetch requests that arrive while the
batcher is waiting for in-flight responses are automatically accumulated into the next batch.
As a result, it is rarely necessary to set linger_ms to a value greater than zero.
Increasing linger_ms may be helpful only if you set a high value for max_in_flight_requests
or if you need to limit request rates to the broker for specific reasons.
Isolation level
The isolation_level is a per-request option passed through the Klife.Client fetch API
(defaulting to :read_committed). It controls whether fetch responses include uncommitted
transactional records:
:read_committed: only returns records from committed transactions. Recommended for most use cases.:read_uncommitted: returns all records, including those from transactions that may later be aborted. Use this only when you need the lowest possible latency and can tolerate reading uncommitted data.
Each fetcher maintains batchers for both isolation levels, so a single fetcher can serve requests with different isolation levels without any additional configuration.
Filtering fetched records
Fetch responses may include records you want to skip. For example, records before your
starting offset, Kafka control records (transaction markers), or records from aborted
transactions. Use Klife.Record.filter_records/2 to remove them:
{:ok, records} = MyClient.fetch("my_topic", 0, 42)
filtered =
Klife.Record.filter_records(records,
base_offset: 42,
exclude_control: true,
exclude_aborted: true
)See the Klife.Record docs for further detail.
Client default fetcher
If a fetch call is made without specifying a fetcher, the client default fetcher is used.
The client default fetcher can be configured as part of the overall Klife.Client setup.
Refer to Klife.Client documentation for more details.