misha_cafex v0.0.6 Cafex.Consumer.Manager

This module is the main manager for a high-level kafka consumer

structure

The manager works together with a offset manager and a group manager to manage the consumer workers.

The group manager handles the client assignment via kafka(0.9) or zookeeper. All consumers in a group will elect a group leader, and the leader collects all the other consumers infomation in the group, performs the load balance of partitions.

The offset manager is responsible for workers offset commit/fetch. It will buffer the offset commit requests to improve the throughput.

Options

All this options must not be ommitted, expect :client_id.

  • :client_id Optional, default client_id is “cafex”
  • :handler Worker handler module
  • :brokers Kafka brokers list
  • :lock Indicate which lock implementation will be use in the worker, default is :consul, another option is :zookeeper
  • :group_manager Default group manager is :kafka which depends on the kafka server with 0.9.x or above.
  • :offset_storage Indicate where to store the consumer’s offset, default is :kafka, another option is :zookeeper
  • :auto_commit
  • :auto_commit_interval
  • :auto_commit_max_buffers
  • :auto_offset_reset
  • :fetch_wait_time
  • :fetch_min_bytes
  • :fetch_max_bytes
  • :zookeeper

These options for start_link/3 can be put under the :cafex key in the config/config.exs file:

  config :cafex, :myconsumer,
    client_id: "cafex",
    brokers: [{"192.168.99.100", 9092}, {"192.168.99.101", 9092}]
    zookeeper: [
      servers: [{"192.168.99.100", 2181}],
      path: "/cafex"
    ],
    handler: {MyConsumer, []}

And then start the manager or start it in your supervisor tree

  Cafex.Consumer.Manager.start_link(:myconsumer, "interested_topic")

Summary

Types

consul_option ::
  {:ttl, integer} |
  {:delay_lock, integer} |
  {:behavior, atom}
option ::
  {:client_id, client_id} |
  {:topic, String.t} |
  {:handler, Cafex.Consumer.Worker.handler} |
  {:brokers, [Cafex.broker]} |
  {:fetch_wait_time, integer} |
  {:fetch_min_bytes, integer} |
  {:fetch_max_bytes, integer} |
  {:auto_commit, boolean} |
  {:auto_commit_interval, integer} |
  {:auto_commit_max_buffers, integer} |
  {:auto_offset_reset, :earliest | :latest} |
  {:lock, :consul | :zookeeper} |
  {:group_manager, :kafka | :zookeeper} |
  {:offset_storage, :kafka | :zookeeper} |
  {:zooKeeper, zookeeper} |
  {:consul, consul}
options :: [option]

Options used by the start_link/3 functions

zookeeper_option ::
  {:servers, [Cafex.server]} |
  {:path, String.t} |
  {:timeout, non_neg_integer}

Functions

start_link(name, opts \\ [])

Specs

start_link(name :: atom, options) :: GenServer.on_start

Start a consumer manager.

Arguments

  • name Consumer group name
  • topic The topic name which will be consumed
  • options Starting options

Options

Read above.

stop(pid)