misha_cafex v0.0.6 Cafex.Consumer.Manager
This module is the main manager for a high-level kafka consumer
structure
The manager works together with a offset manager and a group manager to manage the consumer workers.
The group manager handles the client assignment via kafka(0.9) or zookeeper. All consumers in a group will elect a group leader, and the leader collects all the other consumers infomation in the group, performs the load balance of partitions.
The offset manager is responsible for workers offset commit/fetch. It will buffer the offset commit requests to improve the throughput.
Options
All this options must not be ommitted, expect :client_id.
:client_idOptional, default client_id is “cafex”:handlerWorker handler module:brokersKafka brokers list:lockIndicate which lock implementation will be use in the worker, default is:consul, another option is:zookeeper:group_managerDefault group manager is:kafkawhich depends on the kafka server with 0.9.x or above.:offset_storageIndicate where to store the consumer’s offset, default is:kafka, another option is:zookeeper:auto_commit:auto_commit_interval:auto_commit_max_buffers:auto_offset_reset:fetch_wait_time:fetch_min_bytes:fetch_max_bytes:zookeeper
These options for start_link/3 can be put under the :cafex key in the config/config.exs file:
config :cafex, :myconsumer,
client_id: "cafex",
brokers: [{"192.168.99.100", 9092}, {"192.168.99.101", 9092}]
zookeeper: [
servers: [{"192.168.99.100", 2181}],
path: "/cafex"
],
handler: {MyConsumer, []}
And then start the manager or start it in your supervisor tree
Cafex.Consumer.Manager.start_link(:myconsumer, "interested_topic")
Summary
Types
Options used by the start_link/3 functions
Types
consul :: [consul_option]
consul_option ::
{:ttl, integer} |
{:delay_lock, integer} |
{:behavior, atom}
option ::
{:client_id, client_id} |
{:topic, String.t} |
{:handler, Cafex.Consumer.Worker.handler} |
{:brokers, [Cafex.broker]} |
{:fetch_wait_time, integer} |
{:fetch_min_bytes, integer} |
{:fetch_max_bytes, integer} |
{:auto_commit, boolean} |
{:auto_commit_interval, integer} |
{:auto_commit_max_buffers, integer} |
{:auto_offset_reset, :earliest | :latest} |
{:lock, :consul | :zookeeper} |
{:group_manager, :kafka | :zookeeper} |
{:offset_storage, :kafka | :zookeeper} |
{:zooKeeper, zookeeper} |
{:consul, consul}
zookeeper_option ::
{:servers, [Cafex.server]} |
{:path, String.t} |
{:timeout, non_neg_integer}
Functions
Specs
start_link(name :: atom, options) :: GenServer.on_start
Start a consumer manager.
Arguments
nameConsumer group nametopicThe topic name which will be consumedoptionsStarting options
Options
Read above.