ex_nsq v1.1.0 NSQ.Consumer.RDY

Consumers have a lot of logic around calculating and distributing RDY. This is where that goes!

Link to this section Summary

Functions

If we’re not in backoff mode and we’ve hit a “trigger point” to update RDY, then go ahead and update RDY. Not for external use

This will only be triggered in odd cases where we’re in backoff or when there are more connections than max in flight. It will randomly change RDY on some connections to 0 and 1 so that they’re all guaranteed to eventually process messages

Initialized from NSQ.Consumer.Supervisor, sends the consumer a message on a fixed interval

Delay for a configured interval, then call RDY.update

Send a RDY command for the given connection

Try to update RDY for a given connection, taking configuration and the current state into account. Not for external use

Link to this section Functions

Link to this function maybe_update(cons, conn, cons_state)

If we’re not in backoff mode and we’ve hit a “trigger point” to update RDY, then go ahead and update RDY. Not for external use.

Link to this function maybe_update!(cons, conn, cons_state)
Link to this function redistribute(cons, cons_state)
redistribute(pid, NSQ.Consumer.state) :: {:ok, NSQ.Consumer.state}

This will only be triggered in odd cases where we’re in backoff or when there are more connections than max in flight. It will randomly change RDY on some connections to 0 and 1 so that they’re all guaranteed to eventually process messages.

Link to this function redistribute!(cons, cons_state)
Link to this function redistribute_loop(cons)
redistribute_loop(pid) :: any

Initialized from NSQ.Consumer.Supervisor, sends the consumer a message on a fixed interval.

Link to this function retry(cons, conn, count, cons_state)

Delay for a configured interval, then call RDY.update.

Link to this function transmit(conn, count, cons_state)

Send a RDY command for the given connection.

Link to this function update(cons, conn, new_rdy, cons_state)

Try to update RDY for a given connection, taking configuration and the current state into account. Not for external use.

Link to this function update!(cons, conn, new_rdy, cons_state)