gaffer (gaffer v0.7.0)
View SourceMain API for managing job queues.
Summary
Types
An age in milliseconds.
Retry backoff strategy.
Per-terminal-state forwarding targets.
An interval in milliseconds.
A job.
A recorded execution error.
Filter options for listing jobs.
Unique job identifier.
Per-job options at insert time.
Possible states of a job.
Maximum age per job state, in milliseconds.
Maximum execution attempts for a job.
Maximum number of concurrent workers.
Job priority. Higher values are processed first.
Pruning configuration for a queue.
Queue identifier.
Queue configuration.
Information about a queue.
Partial queue configuration for updates.
Grace period for worker shutdown in milliseconds.
Execution timeout in milliseconds.
Job timestamp.
Job Management
Cancels a job, preventing further execution.
Deletes a job.
Equivalent to drain(Queue, 5000).
Waits for the active workers to finish their jobs.
Equivalent to flush(Queue, infinity).
Waits for all jobs in the queue to finish.
Gets the definition of a job.
Equivalent to insert(Queue, Payload, #{}).
Inserts a job into a queue.
Lists all jobs in the given queue.
Lists jobs in the given queue matching the filter options.
Triggers an immediate prune of stale jobs in the given queue.
Queue Management
Creates a new queue.
Deletes a queue.
Deletes a queue using an explicit driver.
Creates a queue or updates it if it already exists.
Gets the configuration of a queue.
Returns current queue runner information.
Lists all queues.
Lists queues in storage that are not in the runtime configuration.
Pauses a queue.
Resumes a paused queue.
Updates the configuration of a queue.
Types
-type age() :: non_neg_integer() | infinity.
An age in milliseconds.
-type backoff() :: non_neg_integer() | [non_neg_integer()].
Retry backoff strategy.
Per-terminal-state forwarding targets.
-type interval() :: pos_integer() | infinity.
An interval in milliseconds.
-type job() :: #{id := job_id(), queue := queue(), payload := term(), state := job_state(), attempt := non_neg_integer(), max_attempts := max_attempts(), priority := priority(), timeout := timeout_ms(), backoff := backoff(), shutdown_timeout := shutdown_timeout(), result => term(), scheduled_at => timestamp(), created_at := timestamp(), attempted_at => timestamp(), completed_at => timestamp(), cancelled_at => timestamp(), failed_at => timestamp(), errors := [job_error()]}.
A job.
-type job_error() :: #{attempt := non_neg_integer(), error := term(), at := timestamp()}.
A recorded execution error.
-type job_filter() :: #{state => job_state()}.
Filter options for listing jobs.
-type job_id() :: keysmith:uuid().
Unique job identifier.
-type job_opts() :: #{queue => queue(), max_attempts => max_attempts(), priority => priority(), timeout => timeout_ms(), backoff => backoff(), shutdown_timeout => shutdown_timeout(), scheduled_at => timestamp()}.
Per-job options at insert time.
-type job_state() :: available | executing | completed | cancelled | failed.
Possible states of a job.
Maximum age per job state, in milliseconds.
-type max_attempts() :: pos_integer().
Maximum execution attempts for a job.
-type max_workers() :: pos_integer() | infinity.
Maximum number of concurrent workers.
-type priority() :: integer().
Job priority. Higher values are processed first.
Pruning configuration for a queue.
When set, a per-queue pruner process periodically deletes jobs in terminal
states older than the configured max_age (in milliseconds).
-type queue() :: atom().
Queue identifier.
-type queue_conf() :: #{name := queue(), driver => gaffer_driver:driver(), worker := gaffer_worker:worker(), global_max_workers => max_workers(), max_workers => max_workers(), poll_interval => interval(), shutdown_timeout => shutdown_timeout(), max_attempts => max_attempts(), timeout => timeout_ms(), backoff => backoff(), priority => priority(), forward => forward(), hooks => [gaffer_hooks:hook()], prune => prune_conf()}.
Queue configuration.
-type queue_info() :: #{status := active | paused, workers := #{active := non_neg_integer(), max := #{local := max_workers(), global := max_workers()}}}.
Information about a queue.
-type queue_updates() :: #{worker => gaffer_worker:worker(), global_max_workers => max_workers(), max_workers => max_workers(), poll_interval => interval(), shutdown_timeout => shutdown_timeout(), max_attempts => max_attempts(), timeout => timeout_ms(), backoff => backoff(), priority => priority(), forward => forward(), hooks => [gaffer_hooks:hook()], prune => prune_conf()}.
Partial queue configuration for updates.
-type shutdown_timeout() :: pos_integer().
Grace period for worker shutdown in milliseconds.
-type timeout_ms() :: pos_integer().
Execution timeout in milliseconds.
-type timestamp() :: integer() | {erlang:time_unit(), integer()}.
Job timestamp.
An erlang:system_time/0 integer or a {Unit, Value} pair.
Job Management
Cancels a job, preventing further execution.
Only jobs that have not yet started executing can be cancelled. A job
already in the executing state returns
{error, {invalid_transition, {executing, cancelled}}}.
Deletes a job.
-spec drain(queue()) -> ok.
Equivalent to drain(Queue, 5000).
Waits for the active workers to finish their jobs.
-spec flush(queue()) -> ok.
Equivalent to flush(Queue, infinity).
Waits for all jobs in the queue to finish.
Gets the definition of a job.
Equivalent to insert(Queue, Payload, #{}).
Inserts a job into a queue.
Lists all jobs in the given queue.
Warning
This is a slow operation potentially going through all jobs, which can take a long time and block other queue operations.
-spec list(queue(), job_filter()) -> [job()].
Lists jobs in the given queue matching the filter options.
Warning
This is a slow operation potentially going through all jobs, which can take a long time and block other queue operations.
Triggers an immediate prune of stale jobs in the given queue.
Queue Management
-spec create_queue(queue_conf()) -> ok | {error, already_exists}.
Creates a new queue.
-spec delete_queue(queue()) -> ok.
Deletes a queue.
-spec delete_queue(queue(), gaffer_driver:driver()) -> ok.
Deletes a queue using an explicit driver.
Also works for orphaned queues not initialized at runtime, as returned by
orphaned_queues/1.
-spec ensure_queue(queue_conf()) -> ok.
Creates a queue or updates it if it already exists.
-spec get_queue(queue()) -> queue_conf().
Gets the configuration of a queue.
-spec info(queue()) -> queue_info().
Returns current queue runner information.
Reports status (active or paused) and the workers local/global counts.
For other metrics, observe lifecycle events via hooks. See gaffer_hooks for
collecting metrics off the event stream, or list/2 for ad-hoc inspection.
-spec list_queues() -> [{queue(), queue_conf()}].
Lists all queues.
-spec orphaned_queues(gaffer_driver:driver()) -> [queue()].
Lists queues in storage that are not in the runtime configuration.
-spec pause(queue()) -> ok | {error, already_paused}.
Pauses a queue.
Running jobs continue to completion. While paused, the queue does not
claim new jobs and pruning is suspended until resume/1 is called.
Explicit calls to prune/1 still run while the queue is paused.
Returns {error, already_paused} if the queue is already paused.
-spec resume(queue()) -> ok | {error, already_active}.
Resumes a paused queue.
Job claiming and pruning resume normally.
Returns {error, already_active} if the queue is not currently paused.
-spec update_queue(queue(), queue_updates()) -> ok.
Updates the configuration of a queue.
Accepts any subset of queue_conf/0 keys except name and driver. The
update is deep-merged into the current configuration, so nested fields can be
tweaked individually (e.g. prune => #{interval => 1000} keeps the existing
prune.max_age).
Deep-merge means a key cannot be removed from a nested map via update (e.g.
dropping the failed entry from forward, or replacing prune.max_age
wholesale). Use ensure_queue/1 with the full configuration for that. Lists,
including hooks, are replaced wholesale; passing hooks => [] clears them.
Swapping worker is not atomic: in-flight jobs finish under the previous
module, the new module is picked up on the next poll. The worker and hooks
values are not type-validated at update time.
Passing name or driver raises {invalid_queue_conf, #{not_updatable => [...]}}.