exq v0.13.2 Exq.Manager.Server
The Manager module is the main orchestrator for the system
It is also the entry point Pid process used by the client to interact with the Exq system.
It's responsibilities include:
- Handle interaction with client and delegate to responsible sub-system
- Initial Setup of Redis Connection (to be moved to supervisor?).
- Setup and tracking of in-progress workers / jobs.
- Poll Redis for new jobs for any queues that have available workers.
- Handling of queue state and subscriptions (addition and removal)
- Initial re-hydration of backup queue on system restart to handle any orphan jobs from last system stop.
The Manager is a GenServer with a timed process loop.
Options
:concurrency
- Default max number of workers to use if not passed in for each queue.:genserver_timeout
- Timeout to use for GenServer calls.:max_retries
- Maximum number of times to retry a failed job:name
- Name of target registered process:namespace
- Redis namespace to store all data under. Defaults to "exq".:queues
- List of queues to monitor. Can be an array of queues names such as ["q1", "q2"], or array of tuples with queue and max number of concurrent workers: [{"q1", 1}, {"q2", 20}]. If only an array is passed in, system will use the defaultconcurrency
value for each queue.:redis_timeout
- Timeout to use for Redis commands.:poll_timeout
- How often to poll Redis for jobs.:scheduler_enable
- Whether scheduler / retry process should be enabled. This defaults to true. Note that is you turn this off, job retries will not be enqueued.:scheduler_poll_timeout
- How often to poll Redis for scheduled / retry jobs.
Redis Options (TODO - move to supervisor after refactor):
:host
- Host name for Redis server (defaults to '127.0.0.1'):port
- Redis port (defaults to 6379):database
- Redis Database number (used for isolation. Defaults to 0).:password
- Redis authentication password (optional, off by default).:redis_options
- Additional options provided to Redix- TODO: What about max_reconnection_attempts
Job lifecycle
The job lifecycle starts with an enqueue of a job. This can be done either via Exq or another system like Sidekiq / Resque.
Note that the JobQueue encapsulates much of this logic.
Client (Exq) -> Manager -> Enqueuer
Assuming Exq is used to Enqueue an immediate job, the following is the flow:
Client calls Exq.enqueue(Exq, "queue_name", Worker, ["arg1", "arg2"])
Manager delegates to Enqueuer
Enqueuer does the following:
- Adds the queue to the "queues" list if not already there.
- Prepare a job struct with a generated UUID and convert to JSON.
- Push the job into the correct queue
- Respond to client with the generated job UUID.
At this point the job is in the correct queue ready to be dequeued.
Manager deq Redis -> Worker (decode & execute job) --> Manager (record)
|
--> Stats (record stats)
The dequeueing of the job is as follows:
The Manager is on a polling cycle, and the :timeout message fires.
Manager tabulates a list of active queues with available workers.
Uses the JobQueue module to fetch jobs. The JobQueue module does this through a single MULT RPOPLPUSH command issued to Redis with the targeted queue.
This command atomicaly pops an item off the queue and stores the item in a backup queue. The backup queue is keyed off the queue and node id, so each node would have their own backup queue.
Note that we cannot use a blocking pop since BRPOPLPUSH (unlike BRPOP) is more limited and can only handle a single queue target (see filed issues in Redis / Sidekiq).
Once the jobs are returned to the manager, the manager goes through each job and creates and kicks off an ephemeral Worker process that will handle the job. The manager also does some tabulation to reduce the worker count for those queues.
The worker parses the JSON object, and figures out the worker to call. It also tells Stats to record a itself in process.
The worker then calls "apply" on the correct target module, and tracks the failure or success of the job. Once the job is finished, it tells the Manager and Stats.
If the job is successful, Manager and Stats simply mark the success of the job.
If the job fails, the Worker module uses the JobQueue module to retry the job if necessary. The retry is done by adding the job to a "retry" queue which is a Sorted Set in Redis. The job is marked with the retry count and scheduled date (using exponential backup). The job is then removed from the backup queue.
If any jobs were fetched from Redis, the Manager will poll again immediately, otherwise if will use the poll_timeout for the next polling.
Retry / Schedule queue
The retry / schedule queue provides functionality for scheduled jobs. This is used both
for the enqueue_in
method which allows a scheduled job in the future, as well
as retry queue, which is used to retry jobs.
Link to this section Summary
Functions
Returns list of active queues with free workers
Returns a specification to start this module under a supervisor
Dequeue jobs and dispatch to workers
Dispatch job to worker if it is not empty Also update worker count for dispatched job
Cleanup host stats on boot
Invoked when the server is started. start_link/3
or start/3
will
block until it returns
Rescue GenServer timeout
Link to this section Functions
available_queues(state)
Returns list of active queues with free workers
child_spec(arg)
Returns a specification to start this module under a supervisor.
See Supervisor
.
dequeue_and_dispatch(state)
Dequeue jobs and dispatch to workers
dequeue_and_dispatch(state, queues)
dispatch_job(state, potential_job)
Dispatch job to worker if it is not empty Also update worker count for dispatched job
dispatch_job(state, job, queue)
handle_cast(msg, state)
Cleanup host stats on boot
init(opts)
Invoked when the server is started. start_link/3
or start/3
will
block until it returns.
args
is the argument term (second argument) passed to start_link/3
.
Returning {:ok, state}
will cause start_link/3
to return
{:ok, pid}
and the process to enter its loop.
Returning {:ok, state, timeout}
is similar to {:ok, state}
except handle_info(:timeout, state)
will be called after timeout
milliseconds if no messages are received within the timeout.
Returning {:ok, state, :hibernate}
is similar to {:ok, state}
except the process is hibernated before entering the loop. See
c:handle_call/3
for more information on hibernation.
Returning {:ok, state, {:continue, continue}}
is similar to
{:ok, state}
except that immediately after entering the loop
the c:handle_continue/2
callback will be invoked with the value
continue
as first argument.
Returning :ignore
will cause start_link/3
to return :ignore
and
the process will exit normally without entering the loop or calling
c:terminate/2
. If used when part of a supervision tree the parent
supervisor will not fail to start nor immediately try to restart the
GenServer
. The remainder of the supervision tree will be started
and so the GenServer
should not be required by other processes.
It can be started later with Supervisor.restart_child/2
as the child
specification is saved in the parent supervisor. The main use cases for
this are:
- The
GenServer
is disabled by configuration but might be enabled later. - An error occurred and it will be handled by a different mechanism than the
Supervisor
. Likely this approach involves callingSupervisor.restart_child/2
after a delay to attempt a restart.
Returning {:stop, reason}
will cause start_link/3
to return
{:error, reason}
and the process to exit with reason reason
without
entering the loop or calling c:terminate/2
.
Callback implementation for GenServer.init/1
.
job_terminated(exq, namespace, queue, job_serialized)
rescue_timeout(f)
Rescue GenServer timeout.