ex_nsq v1.1.0 NSQ.Consumer

A consumer is a process that creates connections to NSQD to receive messages for a specific topic and channel. It has three primary functions:

  1. Provide a simple interface for a user to setup and configure message handlers.
  2. Balance RDY across all available connections.
  3. Add/remove connections as they are discovered.

Simple Interface

In standard practice, the only function a user should need to know about is NSQ.Consumer.Supervisor.start_link/3. It takes a topic, a channel, and an NSQ.Config struct, which has possible values defined and explained in nsq/config.ex.

{:ok, consumer} = NSQ.Consumer.Supervisor.start_link("my-topic", "my-channel", %NSQ.Config{
  nsqlookupds: ["127.0.0.1:6751", "127.0.0.1:6761"],
  message_handler: fn(body, msg) ->
    # handle them message
    :ok
  end
})

Message handler return values

The return value of the message handler determines how we will respond to NSQ.

:ok

The message was handled and should not be requeued. This sends a FIN command to NSQD.

:req

This message should be requeued. With no delay specified, it will calculate delay exponentially based on the number of attempts. Refer to Message.calculate_delay for the exact formula.

{:req, delay}

This message should be requeued. Use the delay specified. A positive integer is expected.

{:req, delay, backoff}

This message should be requeued. Use the delay specified. If backoff is truthy, the consumer will temporarily set RDY to 0 in order to stop receiving messages. It will use a standard strategy to resume from backoff mode.

This type of return value is only meant for exceptional cases, such as internal network partitions, where stopping message handling briefly could be beneficial. Only use this return value if you know what you’re doing.

A message handler that throws an unhandled exception will automatically requeue and enter backoff mode.

NSQ.Message.touch(msg)

NSQ.Config has a property called msg_timeout, which configures the NSQD server to wait that long before assuming the message failed and requeueing it. If you expect your message handler to take longer than that, you can call NSQ.Message.touch(msg) from the message handler to reset the server-side timer.

NSQ.Consumer.change_max_in_flight(consumer, max_in_flight)

If you’d like to manually change the max in flight of a consumer, use this function. It will cause the consumer’s connections to rebalance to the new value. If the new max_in_flight is smaller than the current messages in flight, it must wait for the existing handlers to finish or requeue before it can fully rebalance.

Link to this section Summary

Types

A tuple with a string ID (used to target the connection in NSQ.Connection.Supervisor) and a PID of the connection

A map, but we can be more specific by asserting some entries that should be set for a connection’s state map

A tuple with a host and a port

Functions

Public function to change max_in_flight for a consumer. The new value will be balanced across connections

If the event manager is not defined in NSQ.Config, it will be generated. So if you want to attach event handlers on the fly, you can use a syntax like NSQ.Consumer.event_manager(consumer) |> GenEvent.add_handler(MyHandler, [])

NSQ.Consumer.Supervisor.start_link returns the supervisor pid so that we can effectively recover from consumer crashes. This function takes the supervisor pid and returns the consumer pid. We use this for public facing functions so that the end user can simply target the supervisor, e.g. NSQ.Consumer.change_max_in_flight(supervisor_pid, 100). Not for external use

Called from tests to assert correct consumer state. Not for external use

Called to observe all connection stats. For debugging or reporting purposes

Called from NSQ.Connection.handle_cast({:nsq_msg, _}, _) after each message is received. Not for external use

On init, we create a connection for each NSQD instance discovered, and set up loops for discovery and RDY redistribution

Starts a Consumer process, called via the supervisor

Link to this section Types

Link to this type connection()
connection() :: {String.t, pid}

A tuple with a string ID (used to target the connection in NSQ.Connection.Supervisor) and a PID of the connection.

Link to this type cons_state()
cons_state() :: %{conn_sup_pid: pid, config: NSQ.Config.t, conn_info_pid: pid}

A map, but we can be more specific by asserting some entries that should be set for a connection’s state map.

Link to this type host_with_port()
host_with_port() :: {String.t, integer}

A tuple with a host and a port.

Link to this type state()
state() :: %{conn_sup_pid: pid, config: NSQ.Config.t, conn_info_pid: pid}

Link to this section Functions

Link to this function change_max_in_flight(sup_pid, new_max_in_flight)
change_max_in_flight(pid, integer) :: {:ok, :ok}

Public function to change max_in_flight for a consumer. The new value will be balanced across connections.

Link to this function conn_info(sup_pid)
Link to this function event_manager(sup_pid)

If the event manager is not defined in NSQ.Config, it will be generated. So if you want to attach event handlers on the fly, you can use a syntax like NSQ.Consumer.event_manager(consumer) |> GenEvent.add_handler(MyHandler, [])

Link to this function get(sup_pid)
get(pid) :: pid

NSQ.Consumer.Supervisor.start_link returns the supervisor pid so that we can effectively recover from consumer crashes. This function takes the supervisor pid and returns the consumer pid. We use this for public facing functions so that the end user can simply target the supervisor, e.g. NSQ.Consumer.change_max_in_flight(supervisor_pid, 100). Not for external use.

Link to this function get_state(cons)
get_state(pid) :: {:ok, cons_state}

Called from tests to assert correct consumer state. Not for external use.

Link to this function handle_call(msg, arg2, state)
handle_call(:redistribute_rdy, {reference, pid}, cons_state) :: {:reply, :ok, cons_state}
handle_call(:discover_nsqds, {reference, pid}, cons_state) :: {:reply, :ok, cons_state}
handle_call(:delete_dead_connections, {reference, pid}, cons_state) :: {:reply, :ok, cons_state}
handle_call({:start_stop_continue_backoff, atom}, {reference, pid}, cons_state) :: {:reply, :ok, cons_state}
handle_call({:update_rdy, connection, integer}, {reference, pid}, cons_state) :: {:reply, :ok, cons_state}
handle_call(:state, {reference, pid}, cons_state) :: {:reply, cons_state, cons_state}
handle_call({:max_in_flight, integer}, {reference, pid}, cons_state) :: {:reply, :ok, cons_state}
handle_call(:event_manager, any, cons_state) :: {:reply, pid, cons_state}
handle_call(:conn_info, any, cons_state) :: {:reply, map, cons_state}

Called to observe all connection stats. For debugging or reporting purposes.

Link to this function handle_cast(msg, state)
handle_cast(:resume, cons_state) :: {:noreply, cons_state}
handle_cast({:maybe_update_rdy, host_with_port}, cons_state) :: {:noreply, cons_state}

Called from NSQ.Connection.handle_cast({:nsq_msg, _}, _) after each message is received. Not for external use.

Link to this function init(args)
init(map) :: {:ok, cons_state}

On init, we create a connection for each NSQD instance discovered, and set up loops for discovery and RDY redistribution.

Link to this function start_link(topic, channel, config, opts \\ [])
start_link(String.t, String.t, NSQ.Config.t, list) :: {:ok, pid}

Starts a Consumer process, called via the supervisor.

Link to this function starved?(sup_pid)