Kaffe.Config.Consumer (Kaffe v1.28.0)
View SourceConfiguration for Kaffe consumers.
The configuration options for the GroupMember consumer are a superset of those for Kaffe.Consumer.
The full list of supported config can be found below.
:endpointsThe endpoints to consume from. Supports either host-port tuples ([localhost: 9092], as an example), or urls.:heroku_kafka_envEndpoints and SSL configuration will be pulled from ENV:consumer_groupThe kafka consumer group the consumer is in. Should be unique to your app.:topicsA list of topics to consume from.:message_handlerThe configured module to receive messages from the consumer.:async_message_ackIf false Kafka offset will automatically acknowledge after successful message parsing. Ifasync_message_ackis true then you'll need to callack/2to acknowledge Kafka messages as processed. Only use async processing if absolutely needed by your application's processing flow. With automatic (sync) acknowledgement then the message flow fromKaffe.Consumerhas backpressure from your system. With manual (async) acknowledgement you will be able to process messages faster but will need to take on the burden of ensuring no messages are lost.:start_with_earliest_messageIf true the worker will consume from the beginning of the topic when it first starts. This only affects consumer behavior before the consumer group starts recording its offsets in Kafka.:auto_start_producersIf true,brodclient will spawn a producer automatically when user is trying to call produce but did not call brod:start_producer explicitly. Defaults to false. Seebroddocumentation for more details.:allow_topic_auto_creationBy default,brodrespects what is configured in the broker about topic auto-creation. i.e. whetherauto.create.topics.enableis set in the broker configuration. However ifallow_topic_auto_creationis set to false in client config, brod will avoid sending metadata requests that may cause an auto-creation of the topic regardless of what broker config is. Defaults to false. Seebrodfor more details.:offset_commit_interval_secondsDefines the time interval between two OffsetCommitRequest messages, defaulting to 5 seconds.:rebalance_delay_msThe time to allow for rebalancing among workers. The default is 10,000, which should give the consumers time to rebalance when scaling.:max_bytesLimits the number of message bytes received from Kafka for a particular topic subscriber. The default is 1MB. This parameter might need tuning depending on the number of partitions in the topics being read (there is one subscriber per topic per partition). For example, if you are reading from two topics, each with 32 partitions, there is the potential of 64MB in buffered messages at any one time.:min_bytesSets a minimum threshold for the number of bytes to fetch for a batch of messages. The default is 0MB.:max_wait_timeSets the maximum number of milliseconds that the broker is allowed to collect min_bytes of messages in a batch of messages.:subscriber_retriesThe number of times a subscriber will retry subscribing to a topic. Defaults to 5.:subscriber_retry_delay_msThe ms a subscriber will delay connecting to a topic after a failure. Defaults to 5000. This only matters whensubscriber_retriesis greater than 0.:client_down_retry_expireThe amount of ms taken to attempt retries on a down client. Defaults to 30_000, and has exponential backoff (currently not configurable).:offset_reset_policyControls how the subscriber handles an expired offset. See the Kafka consumer option,auto.offset.reset. Valid values for this option are::reset_to_earliestReset to the earliest available offset.:reset_to_latestReset to the latest offset.:reset_by_subscriberThe subscriber receives theOffsetOutOfRangeerror.
More information in the Brod consumer.
:worker_allocation_strategyControls how workers are allocated with respect to consumed topics and partitions.:worker_per_partitionThe default (for backward compatibilty) and allocates a single worker per partition across topics. This is useful for managing concurrent processing of messages that may be received from any consumed topic.:worker_per_topic_partitionThis strategy allocates a worker per topic partition. This means there will be a worker for every topic partition consumed. Unless you need to control concurrency across topics, you should use this strategy.