EctoJob.JobQueue behaviour (ecto_job v3.1.0)

Mixin for defining an Ecto schema for a Job Queue table in the database.


  • :table_name - (required) The name of the job table in the database.
  • :schema_prefix - (optional) The schema prefix for the table.
  • :timestamps_opts - (optional) Configures the timestamp fields for the schema (See Ecto.Schema.timestamps/1)


defmodule MyApp.JobQueue do
  use EctoJob.JobQueue, table_name: "jobs", schema_prefix: "my_app"

  @spec perform(Ecto.Multi.t(), map()) :: any()
  def perform(multi, job = %{}) do

A job Ecto.Schema struct.

An Ecto.Repo module name

An Ecto.Schema module name

Job State enumeration


Updates all jobs in the "RESERVED" or "IN_PROGRESS" state with expires time <= now to "AVAILABLE" state.

Updates all jobs in the "SCHEDULED" and "RETRY" state with scheduled time <= now to "AVAILABLE" state.

Builds a query for a batch of available jobs.

Creates an Ecto.Changeset that will delete a job, confirming that the attempt counter hasn't been increased by another worker process.

Updates all jobs that have been attempted the maximum number of times to "FAILED".

Computes the expiry time for an "IN_PROGRESS" and schedule time of "RETRY" jobs based on the current time and attempt counter.

Creates an Ecto.Multi struct with the command to delete the given job from the queue.

Transitions a job from "IN_PROGRESS" to "RETRY" or "FAILED" after execution failure. If the job has exceeded the configuredmax_attemptsthe state will move to "FAILED", otherwise the state is transitioned to"RETRY"` and changes the schedule time so the job will be picked up again.

Computes the expiry time for a job reservation to be held, given the current time.

Updates a batch of jobs in "AVAILABLE" state to "RESERVED" state with a timeout.

Transitions a job from "RESERVED" to "IN_PROGRESS".


Job execution callback to be implemented by each JobQueue module.

job() :: %module(){
  __meta__: Ecto.Schema.Metadata.t(),
  id: integer() | nil,
  state: state(),
  expires: DateTime.t() | nil,
  schedule: DateTime.t() | nil,
  attempt: integer(),
  max_attempts: integer() | nil,
  params: params(),
  notify: String.t() | nil,
  priority: integer(),
  inserted_at: DateTime.t() | nil,
  updated_at: DateTime.t() | nil


params() :: map() | any()

A job Ecto.Schema struct.


repo() :: module()

An Ecto.Repo module name


schema() :: module()

An Ecto.Schema module name


state() :: String.t()

Job State enumeration

  • "SCHEDULED": The job is scheduled to run at a future time
  • "AVAILABLE": The job is availble to be run by the next available worker
  • "RESERVED": The job has been reserved by a worker for execution
  • "IN_PROGRESS": The job is currently being worked
  • "RETRY": The job has failed and it's waiting for a retry
  • "FAILED": The job has exceeded the max_attempts and will not be retried again

activate_expired_jobs(repo, schema, now)

activate_expired_jobs(repo(), schema(), DateTime.t()) :: integer()

Updates all jobs in the "RESERVED" or "IN_PROGRESS" state with expires time <= now to "AVAILABLE" state.

Returns the number of jobs updated.

activate_scheduled_jobs(repo, schema, now)

activate_scheduled_jobs(repo(), schema(), DateTime.t()) :: integer()

Updates all jobs in the "SCHEDULED" and "RETRY" state with scheduled time <= now to "AVAILABLE" state.

Returns the number of jobs updated.

available_jobs(schema, demand)

available_jobs(schema(), integer()) :: Ecto.Query.t()

Builds a query for a batch of available jobs.

The batch size is determined by demand

delete_job_changeset(job()) :: Ecto.Changeset.t()

Creates an Ecto.Changeset that will delete a job, confirming that the attempt counter hasn't been increased by another worker process.

fail_expired_jobs_at_max_attempts(repo, schema, now)

fail_expired_jobs_at_max_attempts(repo(), schema(), DateTime.t()) :: integer()

Updates all jobs that have been attempted the maximum number of times to "FAILED".

Returns the number of jobs updated.

increase_time(now, attempt, timeout_ms)

increase_time(DateTime.t(), integer(), integer()) :: DateTime.t()

Computes the expiry time for an "IN_PROGRESS" and schedule time of "RETRY" jobs based on the current time and attempt counter.


initial_multi(job()) :: Ecto.Multi.t()

Creates an Ecto.Multi struct with the command to delete the given job from the queue.

This will be passed as the first argument to the user supplied callback function.

job_failed(repo, job, now, retry_timeout_ms)

job_failed(repo(), job(), DateTime.t(), integer()) :: {:ok, job()} | :error

Transitions a job from "IN_PROGRESS" to "RETRY" or "FAILED" after execution failure. If the job has exceeded the configuredmax_attemptsthe state will move to "FAILED", otherwise the state is transitioned to"RETRY"` and changes the schedule time so the job will be picked up again.

reservation_expiry(now, timeout_ms)

reservation_expiry(DateTime.t(), integer()) :: DateTime.t()

Computes the expiry time for a job reservation to be held, given the current time.

reserve_available_jobs(repo, schema, demand, now, timeout_ms)

reserve_available_jobs(repo(), schema(), integer(), DateTime.t(), integer()) ::
  {integer(), [job()]}

Updates a batch of jobs in "AVAILABLE" state to "RESERVED" state with a timeout.

The batch size is determined by demand. returns {count, updated_jobs} tuple.

update_job_in_progress(repo, job, now, timeout_ms)

update_job_in_progress(repo(), job(), DateTime.t(), integer()) ::
  {:ok, job()} | {:error, :expired}

Transitions a job from "RESERVED" to "IN_PROGRESS".

Confirms that the job reservation hasn't expired by checking:

  • The attempt counter hasn't been changed
  • The state is still "RESERVED"
  • The expiry time is in the future

Updates the state to "IN_PROGRESS", increments the attempt counter, and sets a timeout proportional to the attempt counter and the expiry_timeout, which defaults to 300_000 ms (5 minutes) unless otherwise configured.

Returns {:ok, job} when sucessful, {:error, :expired} otherwise.

perform(multi :: Ecto.Multi.t(), params :: params()) ::
  {:ok, any()}
  | {:error, any()}
  | {:error,, any(), %{required( => any()}}

Job execution callback to be implemented by each JobQueue module.

The return type is the same as Ecto.Repo.transaction/1.


@impl true
def perform(multi, params = %{"type" => "new_user"}), do: NewUser.perform(multi, params)
def perform(multi, params = %{"type" => "sync_crm"}), do: SyncCRM.perform(multi, params)