gaffer (gaffer v0.7.0)

View Source

Main API for managing job queues.

Summary

Types

An age in milliseconds.

Retry backoff strategy.

Per-terminal-state forwarding targets.

An interval in milliseconds.

A job.

A recorded execution error.

Filter options for listing jobs.

Unique job identifier.

Per-job options at insert time.

Possible states of a job.

Maximum age per job state, in milliseconds.

Maximum execution attempts for a job.

Maximum number of concurrent workers.

Job priority. Higher values are processed first.

Pruning configuration for a queue.

Queue identifier.

Queue configuration.

Information about a queue.

Partial queue configuration for updates.

Grace period for worker shutdown in milliseconds.

Execution timeout in milliseconds.

Job timestamp.

Job Management

Cancels a job, preventing further execution.

Deletes a job.

Waits for the active workers to finish their jobs.

Waits for all jobs in the queue to finish.

Gets the definition of a job.

Inserts a job into a queue.

Lists all jobs in the given queue.

Lists jobs in the given queue matching the filter options.

Triggers an immediate prune of stale jobs in the given queue.

Queue Management

Creates a new queue.

Deletes a queue.

Deletes a queue using an explicit driver.

Creates a queue or updates it if it already exists.

Gets the configuration of a queue.

Returns current queue runner information.

Lists all queues.

Lists queues in storage that are not in the runtime configuration.

Pauses a queue.

Resumes a paused queue.

Updates the configuration of a queue.

Types

age()

-type age() :: non_neg_integer() | infinity.

An age in milliseconds.

backoff()

-type backoff() :: non_neg_integer() | [non_neg_integer()].

Retry backoff strategy.

forward()

-type forward() :: #{completed => queue(), failed => queue(), cancelled => queue()}.

Per-terminal-state forwarding targets.

interval()

-type interval() :: pos_integer() | infinity.

An interval in milliseconds.

job()

-type job() ::
          #{id := job_id(),
            queue := queue(),
            payload := term(),
            state := job_state(),
            attempt := non_neg_integer(),
            max_attempts := max_attempts(),
            priority := priority(),
            timeout := timeout_ms(),
            backoff := backoff(),
            shutdown_timeout := shutdown_timeout(),
            result => term(),
            scheduled_at => timestamp(),
            created_at := timestamp(),
            attempted_at => timestamp(),
            completed_at => timestamp(),
            cancelled_at => timestamp(),
            failed_at => timestamp(),
            errors := [job_error()]}.

A job.

job_error()

-type job_error() :: #{attempt := non_neg_integer(), error := term(), at := timestamp()}.

A recorded execution error.

job_filter()

-type job_filter() :: #{state => job_state()}.

Filter options for listing jobs.

job_id()

-type job_id() :: keysmith:uuid().

Unique job identifier.

job_opts()

-type job_opts() ::
          #{queue => queue(),
            max_attempts => max_attempts(),
            priority => priority(),
            timeout => timeout_ms(),
            backoff => backoff(),
            shutdown_timeout => shutdown_timeout(),
            scheduled_at => timestamp()}.

Per-job options at insert time.

job_state()

-type job_state() :: available | executing | completed | cancelled | failed.

Possible states of a job.

max_age()

-type max_age() :: #{job_state() | '_' => age()}.

Maximum age per job state, in milliseconds.

max_attempts()

-type max_attempts() :: pos_integer().

Maximum execution attempts for a job.

max_workers()

-type max_workers() :: pos_integer() | infinity.

Maximum number of concurrent workers.

priority()

-type priority() :: integer().

Job priority. Higher values are processed first.

prune_conf()

-type prune_conf() :: #{interval := interval(), max_age => max_age()}.

Pruning configuration for a queue.

When set, a per-queue pruner process periodically deletes jobs in terminal states older than the configured max_age (in milliseconds).

queue()

-type queue() :: atom().

Queue identifier.

queue_conf()

-type queue_conf() ::
          #{name := queue(),
            driver => gaffer_driver:driver(),
            worker := gaffer_worker:worker(),
            global_max_workers => max_workers(),
            max_workers => max_workers(),
            poll_interval => interval(),
            shutdown_timeout => shutdown_timeout(),
            max_attempts => max_attempts(),
            timeout => timeout_ms(),
            backoff => backoff(),
            priority => priority(),
            forward => forward(),
            hooks => [gaffer_hooks:hook()],
            prune => prune_conf()}.

Queue configuration.

queue_info()

-type queue_info() ::
          #{status := active | paused,
            workers :=
                #{active := non_neg_integer(),
                  max := #{local := max_workers(), global := max_workers()}}}.

Information about a queue.

queue_updates()

-type queue_updates() ::
          #{worker => gaffer_worker:worker(),
            global_max_workers => max_workers(),
            max_workers => max_workers(),
            poll_interval => interval(),
            shutdown_timeout => shutdown_timeout(),
            max_attempts => max_attempts(),
            timeout => timeout_ms(),
            backoff => backoff(),
            priority => priority(),
            forward => forward(),
            hooks => [gaffer_hooks:hook()],
            prune => prune_conf()}.

Partial queue configuration for updates.

shutdown_timeout()

-type shutdown_timeout() :: pos_integer().

Grace period for worker shutdown in milliseconds.

timeout_ms()

-type timeout_ms() :: pos_integer().

Execution timeout in milliseconds.

timestamp()

-type timestamp() :: integer() | {erlang:time_unit(), integer()}.

Job timestamp.

An erlang:system_time/0 integer or a {Unit, Value} pair.

Job Management

cancel(Queue, ID)

-spec cancel(queue(), job_id()) -> {ok, job()} | {error, {invalid_transition, term()}}.

Cancels a job, preventing further execution.

Only jobs that have not yet started executing can be cancelled. A job already in the executing state returns {error, {invalid_transition, {executing, cancelled}}}.

delete(Queue, ID)

-spec delete(queue(), job_id()) -> ok.

Deletes a job.

drain(Queue)

-spec drain(queue()) -> ok.

Equivalent to drain(Queue, 5000).

drain(Queue, Timeout)

-spec drain(queue(), timeout()) -> ok.

Waits for the active workers to finish their jobs.

flush(Queue)

-spec flush(queue()) -> ok.

Equivalent to flush(Queue, infinity).

flush(Queue, Timeout)

-spec flush(queue(), timeout()) -> ok.

Waits for all jobs in the queue to finish.

get(Queue, ID)

-spec get(queue(), job_id()) -> job().

Gets the definition of a job.

insert(Queue, Payload)

-spec insert(queue(), term()) -> job().

Equivalent to insert(Queue, Payload, #{}).

insert(Queue, Payload, Opts)

-spec insert(queue(), term(), job_opts()) -> job().

Inserts a job into a queue.

list(Queue)

-spec list(queue()) -> [job()].

Lists all jobs in the given queue.

Warning

This is a slow operation potentially going through all jobs, which can take a long time and block other queue operations.

list(Queue, Filters)

-spec list(queue(), job_filter()) -> [job()].

Lists jobs in the given queue matching the filter options.

Warning

This is a slow operation potentially going through all jobs, which can take a long time and block other queue operations.

prune(Queue)

-spec prune(queue()) -> [job_id()].

Triggers an immediate prune of stale jobs in the given queue.

Queue Management

create_queue(Conf)

-spec create_queue(queue_conf()) -> ok | {error, already_exists}.

Creates a new queue.

delete_queue(Name)

-spec delete_queue(queue()) -> ok.

Deletes a queue.

delete_queue(Name, Driver)

-spec delete_queue(queue(), gaffer_driver:driver()) -> ok.

Deletes a queue using an explicit driver.

Also works for orphaned queues not initialized at runtime, as returned by orphaned_queues/1.

ensure_queue(Conf)

-spec ensure_queue(queue_conf()) -> ok.

Creates a queue or updates it if it already exists.

get_queue(Name)

-spec get_queue(queue()) -> queue_conf().

Gets the configuration of a queue.

info(Queue)

-spec info(queue()) -> queue_info().

Returns current queue runner information.

Reports status (active or paused) and the workers local/global counts.

For other metrics, observe lifecycle events via hooks. See gaffer_hooks for collecting metrics off the event stream, or list/2 for ad-hoc inspection.

list_queues()

-spec list_queues() -> [{queue(), queue_conf()}].

Lists all queues.

orphaned_queues(Driver)

-spec orphaned_queues(gaffer_driver:driver()) -> [queue()].

Lists queues in storage that are not in the runtime configuration.

pause(Queue)

-spec pause(queue()) -> ok | {error, already_paused}.

Pauses a queue.

Running jobs continue to completion. While paused, the queue does not claim new jobs and pruning is suspended until resume/1 is called. Explicit calls to prune/1 still run while the queue is paused.

Returns {error, already_paused} if the queue is already paused.

resume(Queue)

-spec resume(queue()) -> ok | {error, already_active}.

Resumes a paused queue.

Job claiming and pruning resume normally.

Returns {error, already_active} if the queue is not currently paused.

update_queue(Name, Updates)

-spec update_queue(queue(), queue_updates()) -> ok.

Updates the configuration of a queue.

Accepts any subset of queue_conf/0 keys except name and driver. The update is deep-merged into the current configuration, so nested fields can be tweaked individually (e.g. prune => #{interval => 1000} keeps the existing prune.max_age).

Deep-merge means a key cannot be removed from a nested map via update (e.g. dropping the failed entry from forward, or replacing prune.max_age wholesale). Use ensure_queue/1 with the full configuration for that. Lists, including hooks, are replaced wholesale; passing hooks => [] clears them.

Swapping worker is not atomic: in-flight jobs finish under the previous module, the new module is picked up on the next poll. The worker and hooks values are not type-validated at update time.

Passing name or driver raises {invalid_queue_conf, #{not_updatable => [...]}}.