ex_nsq v1.1.0 NSQ.Consumer
A consumer is a process that creates connections to NSQD to receive messages for a specific topic and channel. It has three primary functions:
- Provide a simple interface for a user to setup and configure message handlers.
- Balance RDY across all available connections.
- Add/remove connections as they are discovered.
Simple Interface
In standard practice, the only function a user should need to know about is
NSQ.Consumer.Supervisor.start_link/3
. It takes a topic, a channel, and an
NSQ.Config struct, which has possible values defined and explained in
nsq/config.ex.
{:ok, consumer} = NSQ.Consumer.Supervisor.start_link("my-topic", "my-channel", %NSQ.Config{
nsqlookupds: ["127.0.0.1:6751", "127.0.0.1:6761"],
message_handler: fn(body, msg) ->
# handle them message
:ok
end
})
Message handler return values
The return value of the message handler determines how we will respond to NSQ.
:ok
The message was handled and should not be requeued. This sends a FIN command to NSQD.
:req
This message should be requeued. With no delay specified, it will calculate delay exponentially based on the number of attempts. Refer to Message.calculate_delay for the exact formula.
{:req, delay}
This message should be requeued. Use the delay specified. A positive integer is expected.
{:req, delay, backoff}
This message should be requeued. Use the delay specified. If backoff
is
truthy, the consumer will temporarily set RDY to 0 in order to stop receiving
messages. It will use a standard strategy to resume from backoff mode.
This type of return value is only meant for exceptional cases, such as internal network partitions, where stopping message handling briefly could be beneficial. Only use this return value if you know what you’re doing.
A message handler that throws an unhandled exception will automatically requeue and enter backoff mode.
NSQ.Message.touch(msg)
NSQ.Config has a property called msg_timeout, which configures the NSQD
server to wait that long before assuming the message failed and requeueing
it. If you expect your message handler to take longer than that, you can call
NSQ.Message.touch(msg)
from the message handler to reset the server-side
timer.
NSQ.Consumer.change_max_in_flight(consumer, max_in_flight)
If you’d like to manually change the max in flight of a consumer, use this
function. It will cause the consumer’s connections to rebalance to the new
value. If the new max_in_flight
is smaller than the current messages in
flight, it must wait for the existing handlers to finish or requeue before
it can fully rebalance.
Link to this section Summary
Types
A tuple with a string ID (used to target the connection in NSQ.Connection.Supervisor) and a PID of the connection
A map, but we can be more specific by asserting some entries that should be set for a connection’s state map
A tuple with a host and a port
Functions
Public function to change max_in_flight
for a consumer. The new value will
be balanced across connections
If the event manager is not defined in NSQ.Config, it will be generated. So
if you want to attach event handlers on the fly, you can use a syntax like
NSQ.Consumer.event_manager(consumer) |> GenEvent.add_handler(MyHandler, [])
NSQ.Consumer.Supervisor.start_link returns the supervisor pid so that we can
effectively recover from consumer crashes. This function takes the supervisor
pid and returns the consumer pid. We use this for public facing functions so
that the end user can simply target the supervisor, e.g.
NSQ.Consumer.change_max_in_flight(supervisor_pid, 100)
. Not for external
use
Called from tests to assert correct consumer state. Not for external use
Called to observe all connection stats. For debugging or reporting purposes
Called from NSQ.Connection.handle_cast({:nsq_msg, _}, _)
after each message
is received. Not for external use
On init, we create a connection for each NSQD instance discovered, and set up loops for discovery and RDY redistribution
Starts a Consumer process, called via the supervisor
Link to this section Types
A tuple with a string ID (used to target the connection in NSQ.Connection.Supervisor) and a PID of the connection.
cons_state() :: %{conn_sup_pid: pid, config: NSQ.Config.t, conn_info_pid: pid}
A map, but we can be more specific by asserting some entries that should be set for a connection’s state map.
A tuple with a host and a port.
Link to this section Functions
change_max_in_flight(pid, integer) :: {:ok, :ok}
Public function to change max_in_flight
for a consumer. The new value will
be balanced across connections.
If the event manager is not defined in NSQ.Config, it will be generated. So
if you want to attach event handlers on the fly, you can use a syntax like
NSQ.Consumer.event_manager(consumer) |> GenEvent.add_handler(MyHandler, [])
NSQ.Consumer.Supervisor.start_link returns the supervisor pid so that we can
effectively recover from consumer crashes. This function takes the supervisor
pid and returns the consumer pid. We use this for public facing functions so
that the end user can simply target the supervisor, e.g.
NSQ.Consumer.change_max_in_flight(supervisor_pid, 100)
. Not for external
use.
Called from tests to assert correct consumer state. Not for external use.
handle_call(:redistribute_rdy, {reference, pid}, cons_state) :: {:reply, :ok, cons_state}
handle_call(:discover_nsqds, {reference, pid}, cons_state) :: {:reply, :ok, cons_state}
handle_call(:delete_dead_connections, {reference, pid}, cons_state) :: {:reply, :ok, cons_state}
handle_call({:start_stop_continue_backoff, atom}, {reference, pid}, cons_state) :: {:reply, :ok, cons_state}
handle_call({:update_rdy, connection, integer}, {reference, pid}, cons_state) :: {:reply, :ok, cons_state}
handle_call(:state, {reference, pid}, cons_state) :: {:reply, cons_state, cons_state}
handle_call({:max_in_flight, integer}, {reference, pid}, cons_state) :: {:reply, :ok, cons_state}
handle_call(:event_manager, any, cons_state) :: {:reply, pid, cons_state}
handle_call(:conn_info, any, cons_state) :: {:reply, map, cons_state}
Called to observe all connection stats. For debugging or reporting purposes.
handle_cast(:resume, cons_state) :: {:noreply, cons_state}
handle_cast({:maybe_update_rdy, host_with_port}, cons_state) :: {:noreply, cons_state}
Called from NSQ.Connection.handle_cast({:nsq_msg, _}, _)
after each message
is received. Not for external use.
On init, we create a connection for each NSQD instance discovered, and set up loops for discovery and RDY redistribution.
start_link(String.t, String.t, NSQ.Config.t, list) :: {:ok, pid}
Starts a Consumer process, called via the supervisor.