Kaffe.Consumer (Kaffe v1.26.0) View Source

Consume messages from Kafka and pass to a given local module.

See start_link/4 for details on how to start a Consumer process.

As messages are consumed from Kafka they will be sent to your handle_message/1 (sync) or handle_message/2 (async) functions for processing in your system. Those functions must return :ok.

Kaffe.Consumer commits offsets to Kafka which is very reliable but not immediate. If your application restarts then it's highly likely you'll reconsume some messages, especially for a quickly moving topic. Be ready!

Link to this section Summary

Functions

Acknowledge the Kafka message as processed.

Call the message handler with the restructured Kafka message.

Initialize the consumer loop.

Start a Kafka consumer

Link to this section Functions

Acknowledge the Kafka message as processed.

  • pid - the pid your handle_message/2 function was given as the first argument
  • message - the Kafka message your handle_message/2 function was given as the second argument
Kaffe.Consumer.ack(pid, message)
Link to this function

handle_message(topic, partition, msg, state)

View Source

Call the message handler with the restructured Kafka message.

Kafka messages come from brod as an Erlang record. To make processing simpler for clients we convert that to an Elixir map. Since the consumer can subscribe to multiple topics with multiple partitions we also add the topic and partition as additional fields.

After compiling the Kafka message your message handler module's handle_message function will be called.

If async is false:

  • Your message handler module's handle_message/1 function will be called with the message
  • The Consumer will block and wait for your handle_message function to complete and then automatically acknowledge the message as processed.

If async is true:

  • Your message handler module's handle_message/2 function will be called with the pid of the running Consumer process and the message.
  • Message intake on the Consumer will not wait for your handle_message/2 to complete and will not automatically acknowledge the message as processed.
  • Once you've processed the message you will need to call Kaffe.Consumer.ack/2 with the pid and message.
Link to this function

init(consumer_group, list)

View Source

Initialize the consumer loop.

Link to this macro

kafka_message(args \\ [])

View Source (macro)
Link to this macro

kafka_message(record, args)

View Source (macro)
Link to this function

start_consumer_client(config)

View Source

Start a Kafka consumer

The consumer pulls in values from the Kaffe consumer configuration:

  • heroku_kafka_env - endpoints and SSL configuration will be pulled from ENV
  • endpoints - plaintext Kafka endpoints
  • consumer_group - the consumer group id (should be unique to your app)
  • topics - a list of Kafka topics to consume
  • message_handler - the module that will be called for each Kafka message
  • async_message_ack - if false Kafka offset will automatically acknowledge after successful message parsing
  • start_with_earliest_message - If true the worker will consume from the beginning of the topic when it first starts. This only affects consumer behavior before the consumer group starts recording its offsets in Kafka.

Note: If async_message_ack is true then you'll need to call ack/2 to acknowledge Kafka messages as processed.

Only use async processing if absolutely needed by your application's processing flow. With automatic (sync) acknowledgement then the message flow from Kaffe.Consumer has backpressure from your system. With manual (async) acknowledgement you will be able to process messages faster but will need to take on the burden of ensuring no messages are lost.