Kaffe.Consumer (Kaffe v1.28.0)
View SourceConsume messages from Kafka and pass to a given local module.
See start_link/4 for details on how to start a Consumer process.
As messages are consumed from Kafka they will be sent to your
handle_message/1 (sync) or handle_message/2 (async) functions for
processing in your system. Those functions must return :ok.
Kaffe.Consumer commits offsets to Kafka which is very reliable but not immediate. If your application restarts then it's highly likely you'll reconsume some messages, especially for a quickly moving topic. Be ready!
Summary
Functions
Acknowledge the Kafka message as processed.
Call the message handler with the restructured Kafka message.
Initialize the consumer loop.
Start a Kafka consumer
Types
@type message() :: %{ key: binary(), value: binary(), topic: binary(), offset: non_neg_integer(), partition: non_neg_integer(), ts: non_neg_integer(), ts_type: :undefined | :create | :append, headers: list() }
Functions
Acknowledge the Kafka message as processed.
pid- the pid yourhandle_message/2function was given as the first argumentmessage- the Kafka message yourhandle_message/2function was given as the second argument
Kaffe.Consumer.ack(pid, message)
Call the message handler with the restructured Kafka message.
Kafka messages come from brod as an Erlang record. To make processing simpler for clients we convert that to an Elixir map. Since the consumer can subscribe to multiple topics with multiple partitions we also add the topic and partition as additional fields.
After compiling the Kafka message your message handler module's
handle_message function will be called.
If async is false:
- Your message handler module's
handle_message/1function will be called with the message - The Consumer will block and wait for your
handle_messagefunction to complete and then automatically acknowledge the message as processed.
If async is true:
- Your message handler module's
handle_message/2function will be called with the pid of the running Consumer process and the message. - Message intake on the Consumer will not wait for your
handle_message/2to complete and will not automatically acknowledge the message as processed. - Once you've processed the message you will need to call
Kaffe.Consumer.ack/2with the pid and message.
Initialize the consumer loop.
Start a Kafka consumer
The consumer pulls in values from the Kaffe consumer configuration. See more
details at Kaffe.Config.Consumer.