View Source Antikythera.AsyncJob behaviour (antikythera v0.5.1)

Antikythera's "async job" functionality allows gears to run their code in background.

This module is the interface for gear implementations to define and register their own async jobs.

Usage

Preparing your async job module

Each gear can define multiple async job modules. Those modules must use this module as in the following example:

defmodule YourGear.SomeAsyncJob do
  use Antikythera.AsyncJob

  # run/3 callback is required
  @impl true
  def run(_payload, _metadata, _context) do
    # do something here
  end

  # abandon/3 callback is optional; you can omit it
  @impl true
  def abandon(_payload, _metadata, _context) do
    # cleanup code when all retries failed
  end

  # inspect_payload/1 callback is optional; you can omit it
  @impl true
  def inspect_payload(payload) do
    # convert a payload to a short string that can be used to identify a job in logs
  end
end

Implementations of run/3 can return any value; it is simply ignored. If execution of run/3 terminated abnormally (exception, timeout, etc.) the job is regarded as failed. Failed jobs are automatically retried up to a specified number of attempts (see below).

abandon/3 optional callback is called when a job is abandoned after all attempts failed. You can put your cleanup logic in this callback when e.g. you use external storage system to store job information. Note that abandon/3 must finish within 10 seconds; when it takes longer, antikythera stops the execution of abandon/3 in the middle.

inspect_payload/1 optional callback is solely for logging purpose. By providing a concrete callback implementation you can additionally include summary of each jobs's payload into logs. As an example, suppose your payload contains "id" field; then it's natural to define the following inspect_payload/1:

def inspect_payload(%{"id" => id}) do
  id
end

Your gear can have multiple modules that use Antikythera.AsyncJob.

Registering jobs

With the above module defined, you can call register/3 to invoke a new job:

YourGear.SomeAsyncJob.register(%{"arbitrary" => "map"}, context, options)

Here first argument is an arbitrary map that is passed to run/3 callback implementation (note that structs are maps and thus usable as payloads). context is a value of Antikythera.Context.t/0 and is used to obtain the executor pool which this job is registered to. When you need to register a job to an executor pool that is not the current one, you can pass a Antikythera.ExecutorPool.Id.t/0 instead of Antikythera.Context.t/0. options must be a Keyword.t/0 which can include the following values:

  • id: An ID of the job. If given it must match the regex pattern ~r/\A[0-9A-Za-z_-]{1,32}\z/ and must be unique in the job queue specified by the second argument. If not given antikythera automatically generates one for you.
  • schedule: When to run the job as a 2-tuple. If not given it defaults to {:once, Antikythera.Time.now()}, i.e., the job will be executed as soon as an available worker process is found in the specified executor pool. Allowed value format is:
    • {:once, Antikythera.Time.t}
      • The job is executed at the given time. After the job is either successfully completed or abandoned by failure(s), the job is removed from the job queue. The time must be a future time and within 50 days from now.
    • {:cron, Antikythera.Cron.t}
      • The job is repeatedly executed at the given cron schedule. After the job is either successfully completed or abandoned by failure(s), the job is requeued to the job queue. Note that next start time is computed from the time of requeuing. For example, if a job is scheduled on every 10 minutes ("/10 *") and its execution takes 15 minutes to complete, then the job will in effect run on every 20 minutes. The schedule will repeat indefinitely; when you have done with the job you can remove it by cancel/3.
  • max_duration: Maximum execution time (in milliseconds) of the job. A job which has been running for more than max_duration is brutally killed and if it has remaining attempts it will be retried. Defaults to 300000 (5 minutes). If explicitly given it cannot exceed 1800000 (30 minutes).
  • attempts: A positive integer within 1..10), up to which antikythera tries to run the job. Defaults to 3.
  • retry_interval: A 2-tuple of integer and float to calculate time interval between retries. Defaults to {5000, 2.0}. First element is a time interval (in milliseconds) between 1st and 2nd attempts and must be within 0..300000. Second element is a multiplying factor for exponential backoff and must be within 1.0..5.0. For example:
    • When retry_interval: {10_000, 2.0} is given,
      • 2nd attempt (1st retry) after failure of 1st attempt is started in 10s,
      • 3rd attempt (2nd retry) after failure of 2nd attempt is started in 20s,
      • 4th attempt (3rd retry) after failure of 3rd attempt is started in 40s,
      • and so on.
    • If you want to set constant interval, specify 1.0 to second element.
  • bypass_job_queue: Whether or not to run the job immediately, skipping addition to the job queue. Defaults to false.
    • When this option is true, register/3 tries to pick a worker process in the executor pool in the same node and then immediately start the job using the worker process. If all processes in the local executor pool are busy, register/3 returns {:error, :no_available_workers}. In that case you have 2 options:
      • give up running the job, or
      • call register/3 again without bypass_job_queue option so that the job will be executed afterward, possibly in another node.
    • Execution with bypass_job_queue option takes advantage of being free from rate limiting (see below) and having no overhead with pushing to the distributed job queue. Please note that if you bypass the job queue you cannot
      • specify start time of the job (it's started immediately and only once), and
      • the job is not retried when its execution results in a failure.
    • Therefore this option cannot be used with schedule, attempts or retry_interval. Note also that jobs running with bypass_job_queue: true cannot be inspected by Antikythera.AsyncJob.list/1 or Antikythera.AsyncJob.status/2.

register/3 returns a tuple of {:ok, Antikythera.AsyncJob.Id.t} on success. You can register jobs up to 1000. When 1000 jobs remain unfinished in the job queue, trying to register a new job results in {:error, :full}.

The example call to register/3 above will eventually invoke YourGear.SomeAsyncJob.run/3 with the given payload and options.

Rate limiting of accesses to each job queue

Antikythera's async job queues are distributed, fault tolerant, persistent data structures. These features come at an extra cost: interacting with a job queue is a relatively expensive operation. In order not to overwhelm a job queue, accesses to a job queue are rate-limited by the token bucket algorithm. Rate limiting is imposed on a per-node basis; in each node, there exists a bucket per job queue.

Each bucket contains up to 30 tokens and one token is generated in every 500 milliseconds. The following operations take tokens from the corresponding bucket:

When there are not enough tokens in the bucket, register/3 and cancel/3 return {:error, {:rate_limit_reached, milliseconds_to_wait}}. On the other hand status/2 and list/1 automatically wait and retry several times when hitting the rate limit.

During testing, the rate limiting feature may disrupt smooth test executions. To bypass rate limiting in your tests you can use Antikythera.Test.AsyncJobHelper.reset_rate_limit_status/1.

Payload

Payloads are registered by calling register/3 and used in run/3, abandon/3 or inspect_payload/1. Payloads are compressed as binary when register/3 is called and kept in memory until job executions, so large payloads could degrade overall system performance. Please avoid including large data in payloads; instead put ID of the data in payload and fetch the whole data in run/3.

Summary

Functions

Cancels an async job registered with the job queue specified by context_or_epool_id.

Retrieves list of async jobs registered with the job queue specified by context_or_epool_id.

Retrieves detailed information of an async job registered with the job queue specified by context_or_epool_id.

Types

@type option() ::
  {:id, Antikythera.AsyncJob.Id.t()}
  | {:schedule, Antikythera.AsyncJob.Schedule.t()}
  | {:max_duration, Antikythera.AsyncJob.MaxDuration.t()}
  | {:attempts, Antikythera.AsyncJob.Attempts.t()}
  | {:retry_interval, Antikythera.AsyncJob.RetryInterval.t()}
  | {:bypass_job_queue, boolean()}

Callbacks

Functions

Link to this function

cancel(gear_name, context_or_epool_id, job_id)

View Source

Cancels an async job registered with the job queue specified by context_or_epool_id.

Note that currently-running job executions cannot be cancelled. However, calling cancel/2 with currently running job's job_id prevents retries of the job when it fails.

Link to this function

list(context_or_epool_id)

View Source

Retrieves list of async jobs registered with the job queue specified by context_or_epool_id.

Each element of returned list is a 3-tuple: scheduled execution time, job ID and current status. The returned list is already sorted.

Link to this function

status(context_or_epool_id, job_id)

View Source

Retrieves detailed information of an async job registered with the job queue specified by context_or_epool_id.