Rihanna v2.3.1 Rihanna.Job behaviour View Source

A behaviour for Rihanna jobs.

You must implement Rihanna.Job.perform/1 in your job, and it must return one of the following values:

  • :ok
  • {:ok, result}
  • :error
  • {:error, reason}

You can define your job like the example below:

defmodule MyApp.MyJob do
  @behaviour Rihanna.Job

  # NOTE: `perform/1` is a required callback. It takes exactly one argument. To
  # pass multiple arguments, wrap them in a list and destructure in the
  # function head as in this example
  def perform([arg1, arg2]) do
    success? = do_some_work(arg1, arg2)

    if success? do
      # job completed successfully
      :ok
    else
      # job execution failed
      {:error, :failed}
    end
  end
end

This behaviour allows you to tailor what you'd like to happen after your job either fails, or raises an exception.

You can define an after_error/2 method which will run before the job is placed on the failed job queue.

If you don't define this callback, it will add it to the failed job queue without running anything.

If you wish to re-enqueue a job to run at a different time, you can simply return {:reenqueue, due_at} where due_at is some DateTime timestamp.

def after_error(failure_reason, args) do
  notify_someone(__MODULE__, failure_reason, args)
end

You can define a retry_at/3 callback function. Returning {:ok, due_at} will schedule the job to run again at that time. Returning :noop (the default) proceeds with normal job failure behavior. The value of attempts counts up from 0, to allow backing off due_at to be calculated.

def retry_at(_failure_reason, _args, attempts) when attempts < 3 do
  due_at = DateTime.add(DateTime.utc_now(), attempts * 5, :second)
  {:ok, due_at}
end

def retry_at(_failure_reason, _args, _attempts) do
  warn("Job failed after 3 attempts")
  :noop
end

You can define a priority/0 function which will be called if no priority is set when a job is enqueued. It should return a single integer. A priority of 1 will be the highest priority, with larger numbers being run after. If you don't define this callback it will default to a priority of 50, in an attempt to have it run at a lower priority.

def priority(), do: 2

Link to this section Summary

Functions

Checks whether a job implemented the after_error callback and runs it if it does.

Update attempts and set due_at datetime

Checks if a job implements priority callback and runs it.

Checks when a job should be retried at

The name of the jobs table.

Link to this section Types

Link to this type

t()

View Source
t() :: %Rihanna.Job{
  due_at: term(),
  enqueued_at: term(),
  fail_reason: term(),
  failed_at: term(),
  id: term(),
  priority: term(),
  rihanna_internal_meta: term(),
  term: term()
}

Link to this section Functions

Link to this function

after_error(job_module, reason, arg)

View Source

Checks whether a job implemented the after_error callback and runs it if it does.

Link to this function

delete(job_id, opts \\ [])

View Source
Link to this function

mark_reenqueued(pg, job, due_at)

View Source
Link to this function

mark_retried(pg, job, due_at)

View Source

Update attempts and set due_at datetime

Checks if a job implements priority callback and runs it.

A lower value means a higher job priority. Has a default of 50, a very low priority, to prevent new jobs from running before higher priority jobs, when no priority is set.

Link to this function

retry_at(job_module, reason, arg, attempts)

View Source

Checks when a job should be retried at

The name of the jobs table.

Link to this section Callbacks

Link to this callback

after_error(arg1, arg)

View Source (optional)
after_error({:error, reason()} | :error | Exception.t(), arg()) :: any()
Link to this callback

perform(arg)

View Source
perform(arg :: any()) :: :ok | {:ok, result()} | :error | {:error, reason()}
Link to this callback

priority()

View Source (optional)
priority() :: pos_integer()
Link to this callback

retry_at(arg1, arg, pos_integer)

View Source (optional)
retry_at({:error, reason()} | :error | Exception.t(), arg(), pos_integer()) ::
  {:ok, DateTime.t()} | :noop