Exq.Manager.Server (exq v0.19.0) View Source

The Manager module is the main orchestrator for the system.

It is also the entry point Pid process used by the client to interact with the Exq system.

It's responsibilities include:

  • Handle interaction with client and delegate to responsible sub-system
  • Initial Setup of Redis Connection (to be moved to supervisor?).
  • Setup and tracking of in-progress workers / jobs.
  • Poll Redis for new jobs for any queues that have available workers.
  • Handling of queue state and subscriptions (addition and removal)
  • Initial re-hydration of backup queue on system restart to handle any orphan jobs from last system stop.

The Manager is a GenServer with a timed process loop.

Options

  • :concurrency - Default max number of workers to use if not passed in for each queue.
  • :genserver_timeout - Timeout to use for GenServer calls.
  • :max_retries - Maximum number of times to retry a failed job
  • :name - Name of target registered process
  • :namespace - Redis namespace to store all data under. Defaults to "exq".
  • :queues - List of queues to monitor. Can be an array of queues names such as ["q1", "q2"], or array of tuples with queue and max number of concurrent workers: [{"q1", 1}, {"q2", 20}]. If only an array is passed in, system will use the default concurrency value for each queue.
  • :redis_timeout - Timeout to use for Redis commands.
  • :poll_timeout - How often to poll Redis for jobs.
  • :scheduler_enable - Whether scheduler / retry process should be enabled. This defaults to true. Note that is you turn this off, job retries will not be enqueued.
  • :scheduler_poll_timeout - How often to poll Redis for scheduled / retry jobs.
  • :shutdown_timeout - The number of milliseconds to wait for workers to finish processing jobs when the application is shutting down

Redis Options (TODO - move to supervisor after refactor):

  • :host - Host name for Redis server (defaults to '127.0.0.1')
  • :port - Redis port (defaults to 6379)
  • :database - Redis Database number (used for isolation. Defaults to 0).
  • :password - Redis authentication password (optional, off by default).
  • :redis_options - Additional options provided to Redix
  • TODO: What about max_reconnection_attempts

Job lifecycle

The job lifecycle starts with an enqueue of a job. This can be done either via Exq or another system like Sidekiq / Resque.

Note that the JobQueue encapsulates much of this logic.

Client (Exq) -> Manager -> Enqueuer

Assuming Exq is used to Enqueue an immediate job, the following is the flow:

  1. Client calls Exq.enqueue(Exq, "queue_name", Worker, ["arg1", "arg2"])

  2. Manager delegates to Enqueuer

  3. Enqueuer does the following:

    • Adds the queue to the "queues" list if not already there.
    • Prepare a job struct with a generated UUID and convert to JSON.
    • Push the job into the correct queue
    • Respond to client with the generated job UUID.

At this point the job is in the correct queue ready to be dequeued.

Manager deq Redis -> Worker (decode & execute job) --> Manager (record)

                                               |
                                               --> Stats (record stats)

The dequeueing of the job is as follows:

  1. The Manager is on a polling cycle, and the :timeout message fires.

  2. Manager tabulates a list of active queues with available workers.

  3. Uses the JobQueue module to fetch jobs. The JobQueue module does this through a single MULT RPOPLPUSH command issued to Redis with the targeted queue.

    This command atomically pops an item off the queue and stores the item in a backup queue. The backup queue is keyed off the queue and node id, so each node would have their own backup queue. Note that we cannot use a blocking pop since BRPOPLPUSH (unlike BRPOP) is more limited and can only handle a single queue target (see filed issues in Redis / Sidekiq).

  4. Once the jobs are returned to the manager, the manager goes through each job and creates and kicks off an ephemeral Worker process that will handle the job. The manager also does some tabulation to reduce the worker count for those queues.

  5. The worker parses the JSON object, and figures out the worker to call. It also tells Stats to record a itself in process.

  6. The worker then calls "apply" on the correct target module, and tracks the failure or success of the job. Once the job is finished, it tells the Manager and Stats.

  7. If the job is successful, Manager and Stats simply mark the success of the job.

    If the job fails, the Worker module uses the JobQueue module to retry the job if necessary. The retry is done by adding the job to a "retry" queue which is a Sorted Set in Redis. The job is marked with the retry count and scheduled date (using exponential backup). The job is then removed from the backup queue.

  8. If any jobs were fetched from Redis, the Manager will poll again immediately, otherwise if will use the poll_timeout for the next polling.

Retry / Schedule queue

The retry / schedule queue provides functionality for scheduled jobs. This is used both for the enqueue_in method which allows a scheduled job in the future, as well as retry queue, which is used to retry jobs.

Link to this section Summary

Functions

Returns list of active queues with free workers

Returns a specification to start this module under a supervisor.

Dequeue jobs and dispatch to workers

Dispatch job to worker if it is not empty Also update worker count for dispatched job

Cleanup host stats on boot

Callback implementation for GenServer.init/1.

Link to this section Functions

Returns list of active queues with free workers

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

dequeue_and_dispatch(state)

View Source

Dequeue jobs and dispatch to workers

Link to this function

dispatch_job(state, potential_job)

View Source

Dispatch job to worker if it is not empty Also update worker count for dispatched job

Link to this function

dispatch_job(state, job, queue)

View Source

Cleanup host stats on boot

Callback implementation for GenServer.init/1.

Link to this function

job_terminated(exq, queue, success)

View Source