Job Queue v0.1.0 JobQueue.Queue
A GenStage based Queue that supports retries, tracking of in-progress jobs
and deduplication. The queue itself is based on an erlang :queue
.
Being based on GenStage, the Queue will not process any events until a consumer is attached that requests demand.
Example
iex> # Setup a Worker
...> defmodule MyWorker do
...> use GenStage
...>
...> def start_link() do
...> GenStage.start_link(__MODULE__, :ok)
...> end
...>
...> def init(:ok) do
...> {:consumer, :ok, subscribe_to: [{MyWorkerQueue, min_demand: 1, max_demand: 3}]}
...> end
...>
...> def handle_events(events, _from, state) do
...> for job = %JobQueue.Job{from: from, event: event} <- events do
...> Process.send(from, event, [])
...> Queue.ack(job)
...> end
...>
...> {:noreply, [], state}
...> end
...> end
...>
...> # Start the Queue
...> {:ok, _queue} = Queue.start_link(MyWorkerQueue)
...>
...> # Start the Consumer
...> {:ok, _worker} = MyWorker.start_link()
...>
...> # Do some work
...> Queue.add(MyWorkerQueue, "Work Done")
...> receive do
...> "Work Done" -> "Success"
...> after
...> 500 -> "Failure"
...> end
"Success"
This example uses a GenStage consumer for the worker. You could theoretically
use multiple stages as long as the final consumer calls JobQueue.Queue.ack
or
JobQueue.Queue.nack
. JobQueue
also provides some assistance for creating simple,
one-step workers using JobQueue.Processor
and JobQueue.Worker
.
Summary
Functions
Acknowledges a given job. This removes it from the in_progress
list
Acknowledges a given job with a reply. The reply will be sent to the Queuing process
as long as it was queued with JobQueue.Queue.add(queue, event, reply: true)
Adds an event to the given queue with the given options
Adds an event to the given queue and waits for it to finish
Adds a list of {id, event}
to the given queue and waits for them all to finish
Negative Acknowledge a give job. This removes it from the in_progress
list
Starts a JobQueue.Queue
with the given options
Retrieves the current state of the Queue
Types
Functions
Specs
ack(JobQueue.Job.t) :: :ok
Acknowledges a given job. This removes it from the in_progress
list.
Specs
ack(JobQueue.Job.t, any) :: :ok
Acknowledges a given job with a reply. The reply will be sent to the Queuing process
as long as it was queued with JobQueue.Queue.add(queue, event, reply: true)
.
Specs
add(any, any, Keyword.t) :: :ok
Adds an event to the given queue with the given options.
Options
reply
- Wether the job will send a reply to the initiating process whenJobQueue.Queue.ack/2
is called.
Specs
add_sync(any, any, integer) ::
{:ok, any} |
{:error, any}
Adds an event to the given queue and waits for it to finish.
Specs
Adds a list of {id, event}
to the given queue and waits for them all to finish.
Options
instance_timeout
- The timeout for any one event to process. Defaults to5_000
.batch_timeout
- The timeout for the entire batch. Defaults to20_000
.
Specs
nack(JobQueue.Job.t) :: :ok
Negative Acknowledge a give job. This removes it from the in_progress
list.
Specs
start_link(any, Keyword.t) :: GenServer.on_start
Starts a JobQueue.Queue
with the given options.
Options
max_retries
- The maximum number of times jobs in this queue will retry before failing.dedupe
- Whether or not the queue discards duplicate events that are already in the queue.
Return values
If the Queue is successfully created and initialized, this function returns
{:ok, pid}
, where pid
is the pid of the stage. If a process with the
specified name already exists, this function returns
{:error, {:already_started, pid}}
with the pid of that process.
If the JobQueue.Queue.init/1
callback fails with reason
, this function returns
{:error, reason}
. Otherwise, if JobQueue.Queue.init/1
returns {:stop, reason}
or :ignore
, the process is terminated and this function returns
{:error, reason}
or :ignore
, respectively.