exq v0.13.0 Exq.Manager.Server

The Manager module is the main orchestrator for the system

It is also the entry point Pid process used by the client to interact with the Exq system.

It's responsibilities include:

  • Handle interaction with client and delegate to responsible sub-system
  • Initial Setup of Redis Connection (to be moved to supervisor?).
  • Setup and tracking of in-progress workers / jobs.
  • Poll Redis for new jobs for any queues that have available workers.
  • Handling of queue state and subscriptions (addition and removal)
  • Initial re-hydration of backup queue on system restart to handle any orphan jobs from last system stop.

The Manager is a GenServer with a timed process loop.

Options

  • :concurrency - Default max number of workers to use if not passed in for each queue.
  • :genserver_timeout - Timeout to use for GenServer calls.
  • :max_retries - Maximum number of times to retry a failed job
  • :name - Name of target registered process
  • :namespace - Redis namespace to store all data under. Defaults to "exq".
  • :queues - List of queues to monitor. Can be an array of queues names such as ["q1", "q2"], or array of tuples with queue and max number of concurrent workers: [{"q1", 1}, {"q2", 20}]. If only an array is passed in, system will use the default concurrency value for each queue.
  • :redis_timeout - Timeout to use for Redis commands.
  • :poll_timeout - How often to poll Redis for jobs.
  • :scheduler_enable - Whether scheduler / retry process should be enabled. This defaults to true. Note that is you turn this off, job retries will not be enqueued.
  • :scheduler_poll_timeout - How often to poll Redis for scheduled / retry jobs.

Redis Options (TODO - move to supervisor after refactor):

  • :host - Host name for Redis server (defaults to '127.0.0.1')
  • :port - Redis port (defaults to 6379)
  • :database - Redis Database number (used for isolation. Defaults to 0).
  • :password - Redis authentication password (optional, off by default).
  • :redis_options - Additional options provided to Redix
  • TODO: What about max_reconnection_attempts

Job lifecycle

The job lifecycle starts with an enqueue of a job. This can be done either via Exq or another system like Sidekiq / Resque.

Note that the JobQueue encapsulates much of this logic.

Client (Exq) -> Manager -> Enqueuer

Assuming Exq is used to Enqueue an immediate job, the following is the flow:

  1. Client calls Exq.enqueue(Exq, "queue_name", Worker, ["arg1", "arg2"])

  2. Manager delegates to Enqueuer

  3. Enqueuer does the following:

    • Adds the queue to the "queues" list if not already there.
    • Prepare a job struct with a generated UUID and convert to JSON.
    • Push the job into the correct queue
    • Respond to client with the generated job UUID.

At this point the job is in the correct queue ready to be dequeued.

Manager deq Redis -> Worker (decode & execute job) --> Manager (record)

                                               |
                                               --> Stats (record stats)

The dequeueing of the job is as follows:

  1. The Manager is on a polling cycle, and the :timeout message fires.

  2. Manager tabulates a list of active queues with available workers.

  3. Uses the JobQueue module to fetch jobs. The JobQueue module does this through a single MULT RPOPLPUSH command issued to Redis with the targeted queue.

    This command atomicaly pops an item off the queue and stores the item in a backup queue. The backup queue is keyed off the queue and node id, so each node would have their own backup queue.

    Note that we cannot use a blocking pop since BRPOPLPUSH (unlike BRPOP) is more limited and can only handle a single queue target (see filed issues in Redis / Sidekiq).

  4. Once the jobs are returned to the manager, the manager goes through each job and creates and kicks off an ephemeral Worker process that will handle the job. The manager also does some tabulation to reduce the worker count for those queues.

  5. The worker parses the JSON object, and figures out the worker to call. It also tells Stats to record a itself in process.

  6. The worker then calls "apply" on the correct target module, and tracks the failure or success of the job. Once the job is finished, it tells the Manager and Stats.

  7. If the job is successful, Manager and Stats simply mark the success of the job.

    If the job fails, the Worker module uses the JobQueue module to retry the job if necessary. The retry is done by adding the job to a "retry" queue which is a Sorted Set in Redis. The job is marked with the retry count and scheduled date (using exponential backup). The job is then removed from the backup queue.

  8. If any jobs were fetched from Redis, the Manager will poll again immediately, otherwise if will use the poll_timeout for the next polling.

Retry / Schedule queue

The retry / schedule queue provides functionality for scheduled jobs. This is used both for the enqueue_in method which allows a scheduled job in the future, as well as retry queue, which is used to retry jobs.

Link to this section Summary

Functions

Returns list of active queues with free workers

Returns a specification to start this module under a supervisor

Dequeue jobs and dispatch to workers

Dispatch job to worker if it is not empty Also update worker count for dispatched job

Cleanup host stats on boot

Invoked when the server is started. start_link/3 or start/3 will block until it returns

Rescue GenServer timeout

Link to this section Functions

Link to this function

available_queues(state)

Returns list of active queues with free workers

Link to this function

child_spec(arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

dequeue_and_dispatch(state)

Dequeue jobs and dispatch to workers

Link to this function

dequeue_and_dispatch(state, queues)

Link to this function

dispatch_job(state, potential_job)

Dispatch job to worker if it is not empty Also update worker count for dispatched job

Link to this function

dispatch_job(state, job, queue)

Link to this function

handle_cast(msg, state)

Cleanup host stats on boot

Invoked when the server is started. start_link/3 or start/3 will block until it returns.

args is the argument term (second argument) passed to start_link/3.

Returning {:ok, state} will cause start_link/3 to return {:ok, pid} and the process to enter its loop.

Returning {:ok, state, timeout} is similar to {:ok, state} except handle_info(:timeout, state) will be called after timeout milliseconds if no messages are received within the timeout.

Returning {:ok, state, :hibernate} is similar to {:ok, state} except the process is hibernated before entering the loop. See c:handle_call/3 for more information on hibernation.

Returning {:ok, state, {:continue, continue}} is similar to {:ok, state} except that immediately after entering the loop the c:handle_continue/2 callback will be invoked with the value continue as first argument.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop or calling c:terminate/2. If used when part of a supervision tree the parent supervisor will not fail to start nor immediately try to restart the GenServer. The remainder of the supervision tree will be started and so the GenServer should not be required by other processes. It can be started later with Supervisor.restart_child/2 as the child specification is saved in the parent supervisor. The main use cases for this are:

  • The GenServer is disabled by configuration but might be enabled later.
  • An error occurred and it will be handled by a different mechanism than the Supervisor. Likely this approach involves calling Supervisor.restart_child/2 after a delay to attempt a restart.

Returning {:stop, reason} will cause start_link/3 to return {:error, reason} and the process to exit with reason reason without entering the loop or calling c:terminate/2.

Callback implementation for GenServer.init/1.

Link to this function

job_terminated(exq, namespace, queue, job_serialized)

Link to this function

rescue_timeout(f)

Rescue GenServer timeout.

Link to this function

rescue_timeout(fail_return, f)

Link to this function

server_name(name)

Link to this function

start_link(opts \\ [])