EctoJob.Producer (ecto_job v3.1.0) View Source
GenStage
producer responsible for reserving available jobs from a job queue, and
passing them on to the consumer module.
The GenStage
will buffer demand when there are insufficient jobs available in the
database.
Installs a timer to check for expired jobs, and uses a Postgrex.Notifications
listener
to dispatch jobs immediately when new jobs are inserted into the database and there is
pending demand.
Link to this section Summary
Functions
Dispatch jobs according to the new demand plus any buffered demand.
Messages from the timer and the notifications listener will be handled in handle_info
.
Starts the sweeper timer to activate scheduled/expired jobs and starts listening for new job notifications.
Starts the producer GenStage process.
Link to this section Types
Specs
notifier() :: pid()
Specs
repo() :: module()
Specs
schema() :: module()
Specs
timeout_ms() :: non_neg_integer()
Link to this section Functions
Specs
handle_demand(integer(), EctoJob.Producer.State.t()) :: {:noreply, [EctoJob.JobQueue.job()], EctoJob.Producer.State.t()}
Dispatch jobs according to the new demand plus any buffered demand.
Specs
handle_info(term(), EctoJob.Producer.State.t()) :: {:noreply, [EctoJob.JobQueue.job()], EctoJob.Producer.State.t()}
Messages from the timer and the notifications listener will be handled in handle_info
.
:poll
messages will attempt to activate jobs, and dispatch them according to current demand.
:notification
messages will dispatch any active jobs according to current demand.
Specs
init(EctoJob.Producer.State.t()) :: {:producer, EctoJob.Producer.State.t()}
Starts the sweeper timer to activate scheduled/expired jobs and starts listening for new job notifications.
Specs
start_link( name: atom(), repo: repo(), schema: schema(), notifier: atom(), poll_interval: non_neg_integer(), reservation_timeout: timeout_ms(), execution_timeout: timeout_ms(), notifications_listen_timeout: timeout_ms() ) :: {:ok, pid()}
Starts the producer GenStage process.
name
: The process name to register this GenStage asrepo
: The Ecto Repo module to user for queryingschema
: The EctoJob.JobQueue module to querynotifier
: The name of thePostgrex.Notifications
notifier processpoll_interval
: Timer interval for activating scheduled/expired jobsnotifications_listen_timeout
: Time in milliseconds that Notifications.listen!/3 is alloted to start listening to notifications from postgrex for new jobs