ecto_job v0.3.0 EctoJob.JobQueue behaviour View Source

Mixin for defining an Ecto schema for a Job Queue table in the database.

Example

defmodule MyApp.JobQueue do
  use EctoJob.JobQueue, table_name: "jobs"

  @spec perform(Ecto.Multi.t(), map()) :: any()
  def perform(multi, job = %{}) do
    ...
  end
end

Link to this section Summary

Types

A job Ecto.Schema struct

An Ecto.Repo module name

An Ecto.Schema module name

Job State enumeration

Functions

Updates all jobs in the "RESERVED" or "IN_PROGRESS" state with expires time <= now to "AVAILABLE" state

Updates all jobs in the "SCHEDULED" state with scheduled time <= now to "AVAILABLE" state

Builds a query for a batch of available jobs

Creates an Ecto.Changeset that will delete a job, confirming that the attempt counter hasn’t been increased by another worker process

Updates all jobs that have been attempted the maximum number of times to "FAILED"

Creates an Ecto.Multi struct with the command to delete the given job from the queue

Computes the expiry time for an "IN_PROGRESS" job based on the current time and attempt counter

Computes the expiry time for a job reservation to be held, given the current time

Updates a batch of jobs in "AVAILABLE" state to "RESERVED" state with a timeout

Transitions a job from "RESERVED" to "IN_PROGRESS"

Callbacks

Job execution callback to be implemented by each JobQueue module

Link to this section Types

Link to this type job() View Source
job() :: %module(){
  id: integer() | nil,
  state: state(),
  expires: DateTime.t() | nil,
  schedule: DateTime.t() | nil,
  attempt: integer(),
  max_attempts: integer() | nil,
  params: map(),
  notify: String.t() | nil,
  inserted_at: DateTime.t() | nil,
  updated_at: DateTime.t() | nil
}

A job Ecto.Schema struct.

An Ecto.Repo module name

An Ecto.Schema module name

Job State enumeration

  • "SCHEDULED": The job is scheduled to run at a future time
  • "AVAILABLE": The job is availble to be run by the next available worker
  • "RESERVED": The job has been reserved by a worker for execution
  • "IN_PROGRESS": The job is currently being worked
  • "FAILED": The job has exceeded the max_attempts and will not be retried again

Link to this section Functions

Link to this function activate_expired_jobs(repo, schema, now) View Source
activate_expired_jobs(repo(), schema(), DateTime.t()) :: integer()

Updates all jobs in the "RESERVED" or "IN_PROGRESS" state with expires time <= now to "AVAILABLE" state.

Returns the number of jobs updated.

Link to this function activate_scheduled_jobs(repo, schema, now) View Source
activate_scheduled_jobs(repo(), schema(), DateTime.t()) :: integer()

Updates all jobs in the "SCHEDULED" state with scheduled time <= now to "AVAILABLE" state.

Returns the number of jobs updated.

Link to this function available_jobs(schema, demand) View Source
available_jobs(schema(), integer()) :: Ecto.Query.t()

Builds a query for a batch of available jobs.

The batch size is determined by demand

Link to this function delete_job_changeset(job) View Source
delete_job_changeset(job()) :: Ecto.Changeset.t()

Creates an Ecto.Changeset that will delete a job, confirming that the attempt counter hasn’t been increased by another worker process.

Link to this function fail_expired_jobs_at_max_attempts(repo, schema, now) View Source
fail_expired_jobs_at_max_attempts(repo(), schema(), DateTime.t()) :: integer()

Updates all jobs that have been attempted the maximum number of times to "FAILED".

Returns the number of jobs updated.

Link to this function initial_multi(job) View Source
initial_multi(job()) :: Ecto.Multi.t()

Creates an Ecto.Multi struct with the command to delete the given job from the queue.

This will be passed as the first argument to the user supplied callback function.

Link to this function progress_expiry(now, attempt, timeout_ms) View Source
progress_expiry(DateTime.t(), integer(), integer()) :: DateTime.t()

Computes the expiry time for an "IN_PROGRESS" job based on the current time and attempt counter.

Link to this function reservation_expiry(now, timeout_ms) View Source
reservation_expiry(DateTime.t(), integer()) :: DateTime.t()

Computes the expiry time for a job reservation to be held, given the current time.

Link to this function reserve_available_jobs(repo, schema, demand, now, timeout_ms) View Source
reserve_available_jobs(repo(), schema(), integer(), DateTime.t(), integer()) ::
  {integer(), [job()]}

Updates a batch of jobs in "AVAILABLE" state to "RESERVED" state with a timeout.

The batch size is determined by demand. returns {count, updated_jobs} tuple.

Link to this function update_job_in_progress(repo, job, now, timeout_ms) View Source
update_job_in_progress(repo(), job(), DateTime.t(), integer()) ::
  {:ok, job()} | {:error, :expired}

Transitions a job from "RESERVED" to "IN_PROGRESS".

Confirms that the job reservation hasn’t expired by checking:

  • The attempt counter hasn’t been changed
  • The state is still "RESERVED"
  • The expiry time is in the future

Updates the state to "IN_PROGRESS", increments the attempt counter, and sets a timeout proportional to the attempt counter and the expiry_timeout, which defaults to 300_000 ms (5 minutes) unless otherwise configured.

Returns {:ok, job} when sucessful, {:error, :expired} otherwise.

Link to this section Callbacks

Link to this callback perform(multi, params) View Source
perform(multi :: Ecto.Multi.t(), params :: map()) :: any()

Job execution callback to be implemented by each JobQueue module.

Example

@impl true
def perform(multi, params = %{"type" => "new_user"}), do: NewUser.perform(multi, params)
def perform(multi, params = %{"type" => "sync_crm"}), do: SyncCRM.perform(multi, params)