Job Queue v0.1.0 JobQueue.Queue

A GenStage based Queue that supports retries, tracking of in-progress jobs and deduplication. The queue itself is based on an erlang :queue.

Being based on GenStage, the Queue will not process any events until a consumer is attached that requests demand.

Example

iex> # Setup a Worker
...> defmodule MyWorker do
...>   use GenStage
...>
...>   def start_link() do
...>     GenStage.start_link(__MODULE__, :ok)
...>   end
...>
...>   def init(:ok) do
...>     {:consumer, :ok, subscribe_to: [{MyWorkerQueue, min_demand: 1, max_demand: 3}]}
...>   end
...>
...>   def handle_events(events, _from, state) do
...>     for job = %JobQueue.Job{from: from, event: event} <- events do
...>       Process.send(from, event, [])
...>       Queue.ack(job)
...>     end
...>
...>     {:noreply, [], state}
...>   end
...> end
...>
...> # Start the Queue
...> {:ok, _queue} = Queue.start_link(MyWorkerQueue)
...>
...> # Start the Consumer
...> {:ok, _worker} = MyWorker.start_link()
...>
...> # Do some work
...> Queue.add(MyWorkerQueue, "Work Done")
...> receive do
...>   "Work Done" -> "Success"
...> after
...>   500 -> "Failure"
...> end
"Success"

This example uses a GenStage consumer for the worker. You could theoretically use multiple stages as long as the final consumer calls JobQueue.Queue.ack or JobQueue.Queue.nack. JobQueue also provides some assistance for creating simple, one-step workers using JobQueue.Processor and JobQueue.Worker.

Summary

Functions

Acknowledges a given job. This removes it from the in_progress list

Acknowledges a given job with a reply. The reply will be sent to the Queuing process as long as it was queued with JobQueue.Queue.add(queue, event, reply: true)

Adds an event to the given queue with the given options

Adds an event to the given queue and waits for it to finish

Adds a list of {id, event} to the given queue and waits for them all to finish

Negative Acknowledge a give job. This removes it from the in_progress list

Starts a JobQueue.Queue with the given options

Retrieves the current state of the Queue

Types

t :: %{name: String.t, q: :queue.queue, in_progress: %{}, pending_demand: integer, max_retries: integer, dedupe: boolean}

Functions

ack(job)

Specs

ack(JobQueue.Job.t) :: :ok

Acknowledges a given job. This removes it from the in_progress list.

ack(job, reply)

Specs

ack(JobQueue.Job.t, any) :: :ok

Acknowledges a given job with a reply. The reply will be sent to the Queuing process as long as it was queued with JobQueue.Queue.add(queue, event, reply: true).

add(queue_name, event, options \\ [])

Specs

add(any, any, Keyword.t) :: :ok

Adds an event to the given queue with the given options.

Options

  • reply - Wether the job will send a reply to the initiating process when JobQueue.Queue.ack/2 is called.
add_sync(module, message, timeout \\ 5000)

Specs

add_sync(any, any, integer) ::
  {:ok, any} |
  {:error, any}

Adds an event to the given queue and waits for it to finish.

add_sync_batch(module, messages, options \\ [])

Specs

add_sync_batch(term, [{any, any}], Keyword.t) ::
  {:ok, [{any, {:ok, any}}]} |
  {:error, String.t, any}

Adds a list of {id, event} to the given queue and waits for them all to finish.

Options

  • instance_timeout - The timeout for any one event to process. Defaults to 5_000.
  • batch_timeout - The timeout for the entire batch. Defaults to 20_000.
nack(job)

Specs

nack(JobQueue.Job.t) :: :ok

Negative Acknowledge a give job. This removes it from the in_progress list.

start_link(name, options \\ [])

Specs

start_link(any, Keyword.t) :: GenServer.on_start

Starts a JobQueue.Queue with the given options.

Options

  • max_retries - The maximum number of times jobs in this queue will retry before failing.

  • dedupe - Whether or not the queue discards duplicate events that are already in the queue.

Return values

If the Queue is successfully created and initialized, this function returns {:ok, pid}, where pid is the pid of the stage. If a process with the specified name already exists, this function returns {:error, {:already_started, pid}} with the pid of that process.

If the JobQueue.Queue.init/1 callback fails with reason, this function returns {:error, reason}. Otherwise, if JobQueue.Queue.init/1 returns {:stop, reason} or :ignore, the process is terminated and this function returns {:error, reason} or :ignore, respectively.

state(queue_name)

Specs

state(any) :: t

Retrieves the current state of the Queue