ex_nsq v1.1.0 NSQ.Consumer.RDY
Consumers have a lot of logic around calculating and distributing RDY. This is where that goes!
Link to this section Summary
Functions
If we’re not in backoff mode and we’ve hit a “trigger point” to update RDY, then go ahead and update RDY. Not for external use
This will only be triggered in odd cases where we’re in backoff or when there are more connections than max in flight. It will randomly change RDY on some connections to 0 and 1 so that they’re all guaranteed to eventually process messages
Initialized from NSQ.Consumer.Supervisor, sends the consumer a message on a fixed interval
Delay for a configured interval, then call RDY.update
Send a RDY command for the given connection
Try to update RDY for a given connection, taking configuration and the current state into account. Not for external use
Link to this section Functions
maybe_update(pid, NSQ.Consumer.connection, NSQ.Consumer.state) :: {:ok, NSQ.Consumer.state}
If we’re not in backoff mode and we’ve hit a “trigger point” to update RDY, then go ahead and update RDY. Not for external use.
redistribute(pid, NSQ.Consumer.state) :: {:ok, NSQ.Consumer.state}
This will only be triggered in odd cases where we’re in backoff or when there are more connections than max in flight. It will randomly change RDY on some connections to 0 and 1 so that they’re all guaranteed to eventually process messages.
Initialized from NSQ.Consumer.Supervisor, sends the consumer a message on a fixed interval.
retry(pid, NSQ.Consumer.connection, integer, NSQ.Consumer.state) :: {:ok, NSQ.Consumer.state}
Delay for a configured interval, then call RDY.update.
transmit(NSQ.Consumer.connection, integer, NSQ.Consumer.state) :: {:ok, NSQ.Consumer.state}
Send a RDY command for the given connection.
update(pid, NSQ.Consumer.connection, integer, NSQ.Consumer.state) :: {:ok, NSQ.Consumer.state}
Try to update RDY for a given connection, taking configuration and the current state into account. Not for external use.