JobQueue

Build Status Hex.pm Inline docs Deps Status License

JobQueue is a small library for building job queues in Elixir. It is based on GenStage and erlang :queues.

The goal of the library is to simplify creation of queues with both single and multiple steps, as well as retrying of individual steps and deduplication of events.

Example Use Case

JobQueue allows building complex workflows such as:

Image Resizer

  • Queue a job with a link to an image to download
  • Master Queue
  • This queue can deduplicate urls so that it won’t re-download an image that is already being processed
  • This queue can also re-try if any sub-jobs fail
  • This queue is broken down to the following steps:

    • Step 1: Download the image
    • With one download queue, this can limit the total number of simultaneous downloads
    • Abort entire pipeline if download fails
    • Step 2: Fan out jobs to resize the image to multiple sizes
    • Limit the number of simultaneous resizes
    • Abort all sizes if one resize fails
    • Step 3: Upload each size to another S3 Bucket
    • Limit the number of simultaneous downloads
    • Retry upload on failure
    • Step 4: Cleanup
    • Cleanup on failures or success
    • Acknowledge job to Master Queue
  • Retry the whole job depending on the nature of sub-job failures

Installation

Add job_queue to your list of dependencies in mix.exs:

def deps do
  [
    {:job_queue, "~> 0.1.0"}
  ]
end

Documentation

Full documentation can be found on hexdocs at https://hexdocs.pm/job_queue/

Quick Start

Write a Worker module to handle your events:

defmodule Worker do
  use JobQueue.Worker

  def handle_event(event) do
    IO.inspect(event)
    {:ok, event}
  end
end

Start the Queue and Processor:

{:ok, _queue} = JobQueue.Queue.start_link(MyQueue)
{:ok, _processor} = JobQueue.Processor.start_link(MyQueue, Worker)

Add some work to the Queue:

JobQueue.Queue.add_sync(MyQueue, :done)
#=> :done

You can also start the queue and processor from a supervisor:

defmodule MyApp.Application do
  @moduledoc false

  use Application

  def start(_type, _args) do
    import Supervisor.Spec, warn: false

    children = [
      worker(JobQueue.Queue, [MyQueue], id: MyQueue),
      worker(JobQueue.Processor, [MyQueue, MyWorker], id: MyWorker),
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end