View Source Antikythera.AsyncJob behaviour (antikythera v0.5.1)
Antikythera's "async job" functionality allows gears to run their code in background.
This module is the interface for gear implementations to define and register their own async jobs.
Usage
Preparing your async job module
Each gear can define multiple async job modules.
Those modules must use
this module as in the following example:
defmodule YourGear.SomeAsyncJob do
use Antikythera.AsyncJob
# run/3 callback is required
@impl true
def run(_payload, _metadata, _context) do
# do something here
end
# abandon/3 callback is optional; you can omit it
@impl true
def abandon(_payload, _metadata, _context) do
# cleanup code when all retries failed
end
# inspect_payload/1 callback is optional; you can omit it
@impl true
def inspect_payload(payload) do
# convert a payload to a short string that can be used to identify a job in logs
end
end
Implementations of run/3
can return any value; it is simply ignored.
If execution of run/3
terminated abnormally (exception, timeout, etc.) the job is regarded as failed.
Failed jobs are automatically retried up to a specified number of attempts (see below).
abandon/3
optional callback is called when a job is abandoned after all attempts failed.
You can put your cleanup logic in this callback when e.g. you use external storage system to store job information.
Note that abandon/3
must finish within 10
seconds;
when it takes longer, antikythera stops the execution of abandon/3
in the middle.
inspect_payload/1
optional callback is solely for logging purpose.
By providing a concrete callback implementation you can additionally include
summary of each jobs's payload into logs.
As an example, suppose your payload
contains "id" field; then it's natural to define
the following inspect_payload/1
:
def inspect_payload(%{"id" => id}) do
id
end
Your gear can have multiple modules that use Antikythera.AsyncJob
.
Registering jobs
With the above module defined, you can call register/3
to invoke a new job:
YourGear.SomeAsyncJob.register(%{"arbitrary" => "map"}, context, options)
Here first argument is an arbitrary map that is passed to run/3
callback implementation
(note that structs are maps and thus usable as payloads).
context
is a value of Antikythera.Context.t/0
and is used to obtain
the executor pool which this job is registered to.
When you need to register a job to an executor pool that is not the current one,
you can pass a Antikythera.ExecutorPool.Id.t/0
instead of Antikythera.Context.t/0
.
options
must be a Keyword.t/0
which can include the following values:
id
: An ID of the job. If given it must match the regex pattern~r/\A[0-9A-Za-z_-]{1,32}\z/
and must be unique in the job queue specified by the second argument. If not given antikythera automatically generates one for you.schedule
: When to run the job as a 2-tuple. If not given it defaults to{:once, Antikythera.Time.now()}
, i.e., the job will be executed as soon as an available worker process is found in the specified executor pool. Allowed value format is:{:once, Antikythera.Time.t}
- The job is executed at the given time. After the job is either successfully completed or abandoned by failure(s), the job is removed from the job queue. The time must be a future time and within 50 days from now.
{:cron, Antikythera.Cron.t}
- The job is repeatedly executed at the given cron schedule.
After the job is either successfully completed or abandoned by failure(s),
the job is requeued to the job queue.
Note that next start time is computed from the time of requeuing.
For example, if a job is scheduled on every 10 minutes ("/10 *")
and its execution takes 15 minutes to complete,
then the job will in effect run on every 20 minutes.
The schedule will repeat indefinitely; when you have done with the job
you can remove it by
cancel/3
.
- The job is repeatedly executed at the given cron schedule.
After the job is either successfully completed or abandoned by failure(s),
the job is requeued to the job queue.
Note that next start time is computed from the time of requeuing.
For example, if a job is scheduled on every 10 minutes ("/10 *")
and its execution takes 15 minutes to complete,
then the job will in effect run on every 20 minutes.
The schedule will repeat indefinitely; when you have done with the job
you can remove it by
max_duration
: Maximum execution time (in milliseconds) of the job. A job which has been running for more thanmax_duration
is brutally killed and if it has remaining attempts it will be retried. Defaults to300000
(5
minutes). If explicitly given it cannot exceed1800000
(30
minutes).attempts
: A positive integer within1..10
), up to which antikythera tries to run the job. Defaults to3
.retry_interval
: A 2-tuple of integer and float to calculate time interval between retries. Defaults to{5000, 2.0}
. First element is a time interval (in milliseconds) between 1st and 2nd attempts and must be within0..300000
. Second element is a multiplying factor for exponential backoff and must be within1.0..5.0
. For example:- When
retry_interval: {10_000, 2.0}
is given,- 2nd attempt (1st retry) after failure of 1st attempt is started in
10
s, - 3rd attempt (2nd retry) after failure of 2nd attempt is started in
20
s, - 4th attempt (3rd retry) after failure of 3rd attempt is started in
40
s, - and so on.
- 2nd attempt (1st retry) after failure of 1st attempt is started in
- If you want to set constant interval, specify
1.0
to second element.
- When
bypass_job_queue
: Whether or not to run the job immediately, skipping addition to the job queue. Defaults tofalse
.- When this option is
true
,register/3
tries to pick a worker process in the executor pool in the same node and then immediately start the job using the worker process. If all processes in the local executor pool are busy,register/3
returns{:error, :no_available_workers}
. In that case you have 2 options:- give up running the job, or
- call
register/3
again withoutbypass_job_queue
option so that the job will be executed afterward, possibly in another node.
- Execution with
bypass_job_queue
option takes advantage of being free from rate limiting (see below) and having no overhead with pushing to the distributed job queue. Please note that if you bypass the job queue you cannot- specify start time of the job (it's started immediately and only once), and
- the job is not retried when its execution results in a failure.
- Therefore this option cannot be used with
schedule
,attempts
orretry_interval
. Note also that jobs running withbypass_job_queue: true
cannot be inspected byAntikythera.AsyncJob.list/1
orAntikythera.AsyncJob.status/2
.
- When this option is
register/3
returns a tuple of {:ok, Antikythera.AsyncJob.Id.t}
on success.
You can register jobs up to 1000.
When 1000 jobs remain unfinished in the job queue, trying to register a new job results in {:error, :full}
.
The example call to register/3
above will eventually invoke YourGear.SomeAsyncJob.run/3
with the given payload and options.
Rate limiting of accesses to each job queue
Antikythera's async job queues are distributed, fault tolerant, persistent data structures. These features come at an extra cost: interacting with a job queue is a relatively expensive operation. In order not to overwhelm a job queue, accesses to a job queue are rate-limited by the token bucket algorithm. Rate limiting is imposed on a per-node basis; in each node, there exists a bucket per job queue.
Each bucket contains up to 30 tokens and one token is generated in every 500 milliseconds. The following operations take tokens from the corresponding bucket:
When there are not enough tokens in the bucket, register/3
and cancel/3
return
{:error, {:rate_limit_reached, milliseconds_to_wait}}
.
On the other hand status/2
and list/1
automatically wait and retry several times
when hitting the rate limit.
During testing, the rate limiting feature may disrupt smooth test executions.
To bypass rate limiting in your tests you can use Antikythera.Test.AsyncJobHelper.reset_rate_limit_status/1
.
Payload
Payloads are registered by calling register/3
and used in run/3
, abandon/3
or inspect_payload/1
.
Payloads are compressed as binary when register/3
is called and kept in memory
until job executions, so large payloads could degrade overall system performance.
Please avoid including large data in payloads; instead put ID of the data in payload
and fetch the whole data in run/3.
Summary
Functions
Cancels an async job registered with the job queue specified by context_or_epool_id
.
Retrieves list of async jobs registered with the job queue specified by context_or_epool_id
.
Retrieves detailed information of an async job registered with the job queue specified by context_or_epool_id
.
Types
@type option() :: {:id, Antikythera.AsyncJob.Id.t()} | {:schedule, Antikythera.AsyncJob.Schedule.t()} | {:max_duration, Antikythera.AsyncJob.MaxDuration.t()} | {:attempts, Antikythera.AsyncJob.Attempts.t()} | {:retry_interval, Antikythera.AsyncJob.RetryInterval.t()} | {:bypass_job_queue, boolean()}
Callbacks
@callback abandon(map(), Antikythera.AsyncJob.Metadata.t(), Antikythera.Context.t()) :: any()
@callback run(map(), Antikythera.AsyncJob.Metadata.t(), Antikythera.Context.t()) :: any()
Functions
@spec cancel( Antikythera.GearName.t(), Antikythera.Context.t() | Antikythera.ExecutorPool.Id.t(), Antikythera.AsyncJob.Id.t() ) :: :ok | {:error, :not_found | Antikythera.ExecutorPool.BadIdReason.t() | {:rate_limit_reached, pos_integer()}}
Cancels an async job registered with the job queue specified by context_or_epool_id
.
Note that currently-running job executions cannot be cancelled.
However, calling cancel/2
with currently running job's job_id
prevents retries of the job when it fails.
@spec list(Antikythera.Context.t() | Antikythera.ExecutorPool.Id.t()) :: [ {Antikythera.Time.t(), Antikythera.AsyncJob.Id.t(), Antikythera.AsyncJob.StateLabel.t()} ]
Retrieves list of async jobs registered with the job queue specified by context_or_epool_id
.
Each element of returned list is a 3-tuple: scheduled execution time, job ID and current status. The returned list is already sorted.
@spec status( Antikythera.Context.t() | Antikythera.ExecutorPool.Id.t(), Antikythera.AsyncJob.Id.t() ) :: Croma.Result.t(Antikythera.AsyncJob.Status.t())
Retrieves detailed information of an async job registered with the job queue specified by context_or_epool_id
.