ecto_job v0.3.0 EctoJob.JobQueue behaviour View Source
Mixin for defining an Ecto schema for a Job Queue table in the database.
Example
defmodule MyApp.JobQueue do
use EctoJob.JobQueue, table_name: "jobs"
@spec perform(Ecto.Multi.t(), map()) :: any()
def perform(multi, job = %{}) do
...
end
end
Link to this section Summary
Functions
Updates all jobs in the "RESERVED"
or "IN_PROGRESS"
state with expires time <= now to "AVAILABLE"
state
Updates all jobs in the "SCHEDULED"
state with scheduled time <= now to "AVAILABLE"
state
Builds a query for a batch of available jobs
Creates an Ecto.Changeset
that will delete a job, confirming that the attempt counter hasn’t been increased by another worker process
Updates all jobs that have been attempted the maximum number of times to "FAILED"
Creates an Ecto.Multi
struct with the command to delete the given job from the queue
Computes the expiry time for an "IN_PROGRESS"
job based on the current time and attempt counter
Computes the expiry time for a job reservation to be held, given the current time
Updates a batch of jobs in "AVAILABLE"
state to "RESERVED"
state with a timeout
Transitions a job from "RESERVED"
to "IN_PROGRESS"
Callbacks
Job execution callback to be implemented by each JobQueue
module
Link to this section Types
job() :: %module(){ id: integer() | nil, state: state(), expires: DateTime.t() | nil, schedule: DateTime.t() | nil, attempt: integer(), max_attempts: integer() | nil, params: map(), notify: String.t() | nil, inserted_at: DateTime.t() | nil, updated_at: DateTime.t() | nil }
A job Ecto.Schema
struct.
An Ecto.Repo
module name
An Ecto.Schema
module name
Job State enumeration
"SCHEDULED"
: The job is scheduled to run at a future time"AVAILABLE"
: The job is availble to be run by the next available worker"RESERVED"
: The job has been reserved by a worker for execution"IN_PROGRESS"
: The job is currently being worked"FAILED"
: The job has exceeded themax_attempts
and will not be retried again
Link to this section Functions
activate_expired_jobs(repo(), schema(), DateTime.t()) :: integer()
Updates all jobs in the "RESERVED"
or "IN_PROGRESS"
state with expires time <= now to "AVAILABLE"
state.
Returns the number of jobs updated.
activate_scheduled_jobs(repo(), schema(), DateTime.t()) :: integer()
Updates all jobs in the "SCHEDULED"
state with scheduled time <= now to "AVAILABLE"
state.
Returns the number of jobs updated.
available_jobs(schema(), integer()) :: Ecto.Query.t()
Builds a query for a batch of available jobs.
The batch size is determined by demand
delete_job_changeset(job()) :: Ecto.Changeset.t()
Creates an Ecto.Changeset
that will delete a job, confirming that the attempt counter hasn’t been increased by another worker process.
fail_expired_jobs_at_max_attempts(repo(), schema(), DateTime.t()) :: integer()
Updates all jobs that have been attempted the maximum number of times to "FAILED"
.
Returns the number of jobs updated.
Creates an Ecto.Multi
struct with the command to delete the given job from the queue.
This will be passed as the first argument to the user supplied callback function.
progress_expiry(DateTime.t(), integer(), integer()) :: DateTime.t()
Computes the expiry time for an "IN_PROGRESS"
job based on the current time and attempt counter.
reservation_expiry(DateTime.t(), integer()) :: DateTime.t()
Computes the expiry time for a job reservation to be held, given the current time.
Updates a batch of jobs in "AVAILABLE"
state to "RESERVED"
state with a timeout.
The batch size is determined by demand
.
returns {count, updated_jobs}
tuple.
update_job_in_progress(repo(), job(), DateTime.t(), integer()) :: {:ok, job()} | {:error, :expired}
Transitions a job from "RESERVED"
to "IN_PROGRESS"
.
Confirms that the job reservation hasn’t expired by checking:
- The attempt counter hasn’t been changed
- The state is still
"RESERVED"
- The expiry time is in the future
Updates the state to "IN_PROGRESS"
, increments the attempt counter, and sets a
timeout proportional to the attempt counter and the expiry_timeout, which defaults to
300_000 ms (5 minutes) unless otherwise configured.
Returns {:ok, job}
when sucessful, {:error, :expired}
otherwise.
Link to this section Callbacks
perform(multi :: Ecto.Multi.t(), params :: map()) :: any()
Job execution callback to be implemented by each JobQueue
module.
Example
@impl true
def perform(multi, params = %{"type" => "new_user"}), do: NewUser.perform(multi, params)
def perform(multi, params = %{"type" => "sync_crm"}), do: SyncCRM.perform(multi, params)