Changelog

All notable changes to Oban.Pro are documented here.

v0.9.0 — 2021-09-15

Partitioned Rate Limiting

Driven by popular demand, v0.9 brings partitions to the Smart Engine's rate limiter. With partitions, rate limits are applied per-worker, on args, or on a subset of args fields rather than across the entire queue. This enables your application to enforce limits per-customer or respect external throttling, without splitting jobs into multiple queues.

rate_limit: [allowed: 100, period: 5, partition: [fields: [:worker]]

Alternatively, you can partition by the job's account_id field:

rate_limit: [allowed: 100, period: 5, partition: [fields: [:args], keys: [:account_id]]]

Naturally, a combination of worker, args, and any number of keys works.

Check out the SmartEngine Guide for options and details.

Batch Worker Improvements

Batches are the oldest worker in Pro, and as such, they existed prior to the meta field. Finally, that difference is rectified. Building batches on meta enables a handful of ergonomic improvements and new functionality:

  • Forwarding args through the batch_callback_args option
  • Heterogeneous batches with an alternative callback module through the batch_callback_worker option
  • Fetch all jobs in a batch with stream_batch_jobs/2 for map/reduce processing

Check out the updated Batch Guide for more examples, configuration options, use cases, and typespecs.

IMPORTANT: For in-flight batch callback jobs to run after an upgrade you'll need to migrate batch_id and callback into meta. Run the following SQL in a migration immediately prior to, or following, the upgrade.

UPDATE oban_jobs
SET meta = jsonb_set_lax(
             jsonb_set(meta, '{batch_id}', args->'batch_id'),
             '{callback}',
             args->'callback',
             true,
             'return_target'
           )
WHERE state NOT IN ('cancelled', 'completed', 'discarded')
  AND args ? 'batch_id'

Workflow Worker Improvements

Workflows got a little ergonomic love, too. Now you can dynamically extend workflows at runtime with the new append_workflow function:

def process(%Job{} = job) do
  jobs =
    job
    |> append_workflow(check_deps: false)
    |> add(:d, WorkerD.new(%{}), deps: [:a])
    |> add(:e, WorkerE.new(%{}), deps: [:b])
    |> add(:f, WorkerF.new(%{}), deps: [:c])
    |> Oban.insert_all()

  {:ok, jobs}
end

Note the use of check_deps: false to prevent dependency validation. To be safe and check jobs while appending, we'll use the new stream_workflow_jobs/1 function to load all of the previous jobs and feed them in:

def process(%Job{} = job) do
  {:ok, jobs} =
    MyApp.Repo.transaction(fn ->
      job
      |> stream_workflow_jobs()
      |> Enum.to_list()
    end)

  jobs
  |> append_workflow()
  |> add(:d, WorkerD.new(%{}), deps: [:a])
  |> add(:e, WorkerE.new(%{}), deps: [:b])
  |> add(:f, WorkerF.new(%{}), deps: [:c])
  |> Oban.insert_all()

  :ok
end

Check out the updated Workflow Guide for details on appending to workflows, job streaming, and complete typespecs.

Changed

  • [Oban] Require Oban ~> v2.9 to support the new cancel_all_jobs engine callback.

  • [Oban.Pro.Queue.SmartEngine] Make operations like refresh more efficient (less data transport) and more failure tolerant to prevent producer crashes.

v0.8.2 — 2021-07-30

Fixed

  • [Workflow] Ensure a minimum snooze when all deps are scheduled.

    When an upstream dependency was scheduled to run right now, or sometime in the past, it would return an invalid value like {:snooze, 0}. In that case the result was considered a success and the job wouldn't run as expected.

  • [DynamicCron] Prevent crashing when a cron's worker module is missing. This could happen during rolling or blue/green deploys.

  • [PG] Correct ident check when receiving scoped messages for pause, resume, or scale type actions.

v0.8.1 — 2021-06-01

Fixed

  • [SmartEngine] Correctly track the rate limit window between days.

    Rate limited queues could get stuck when fetch invocations straddled midnight. In that scenario the queue wouldn't start fetching more until the clock time for the current day exceeded the last recorded time from the previous day.

  • [SmartEngine] Silence operations in multi transactions. The log level wasn't respected for some frequently used operations.

  • [Relay] Remove "local" optimization that could incorrectly prevent results from getting back to the calling node.

v0.8.0 — 2021-05-25

Added

  • [PG] An alternate Oban.Notifier implementation based on pg/pg2 for use with Distributed Erlang.

  • [DynamicCron] Store timezone overrides for individual cron entries.

    Along with the a timezone for the DynamicCron plugin, individual cron entries now support a timezone override. The timezone can be set when inserting or updating cron entries.

Changed

  • [Relay] Decode relay results using the :safe option to prevent possible atom table exhaustion or remote code injection.

Fixed

  • [SmartEngine] Implementing a cancel_job/2 callback within the engine makes cancelling jobs safe. Previously cancelling a job could leave the job id in the list of running jobs, which could eventually exhaust the concurrency limit and prevent a queue from running new jobs.

v0.7.3 — 2021-05-19

Changed

  • [SmartEngine] Use database connection safety for all SmartEngine calls.

    Database connections may be unstable for many reasons: pool contention, database overload, crashes. We shouldn't crash the producer due to a flickering database call.

Fixed

  • [DynamicCron] Explicitly declare cron timestamps as DateTime to compensate for apps that configure Ecto to use NaiveDateTime by default.

v0.7.2 — 2021-04-11

Changed

  • [Lifeline] The plugin will only run along with the SmartEngine. Running with the BasicEngine is flawed and will cause inadvertent orphan rescues.

    You can either switch to the SmartEngine or remove Lifeline from your plugins.

Fixed

  • [SmartEngine] Consider the configured prefix for all ack and update options.

v0.7.1 — 2021-04-07

Changed

  • [Relay] Restore the ability to use Elixir 1.9 by removing use of the is_map_key guard that wasn't introduced until Elixir 1.10.

  • [SmartEngine] Apply jitter and more selective use of the mutex used for global coordination. The engine also avoids touching or reloading producer records to minimize connection use.

  • [Workflow] Improve deps check performance by using an index powered operator. On instances with millions of jobs this may improve performance ~600x.

v0.7.0 — 2021-04-02

Pluggable Smart Engine

Pro's SmartEngine enables truly global concurrency and global rate limiting by handling demand at the queue level, without any conflict or churn. The engine uses centralized producer records to coordinate with minimal load on the database.

config :my_app, Oban,
  engine: Oban.Pro.Queue.SmartEngine,
  queues: [
    alpha: 1,
    gamma: [global_limit: 1],
    delta: [local_limit: 2, global_limit: 5],
    kappa: [local_limit: 5, rate_limit: [allowed: 30, period: {1, :minute}]],
    omega: [global_limit: 1, rate_limit: [allowed: 500, period: {1, :hour}]]
  ],
  ...

Check out the SmartEngine Guide for a walkthrough and details.

Relay Plugin

The Relay plugin lets you insert and await the results of jobs locally or remotely, across any number of nodes, i.e. persistent distributed tasks. Once the plugin is running, you can seamlessly distribute oban jobs and await the results synchronously:

alias Oban.Pro.Plugins.Relay

1..3
|> Enum.map(&DoubleWorker.new(%{int: &1}))
|> Enum.map(&Relay.async/1)
|> Relay.await_many(timeout: :timer.seconds(1))

# [{:ok, 2}, {:ok, 4}, {:ok, 6}]

See the Relay Plugin Guide for usage and details.

Chunk Worker

Process jobs "broadway style", in groups based on size or a timeout, but with the robust error handling semantics of Oban. Chunking operates at the worker level, allowing many chunks to run in parallel within the same queue.

defmodule MyApp.ChunkWorker do
  use Oban.Pro.Workers.Chunk, queue: :alpha, size: 10, timeout: :timer.seconds(5)

  @impl Chunk
  def process(jobs) do
    jobs
    |> Enum.map(& &1.args)
    |> Business.process_messages()

    :ok
  end
end

See the Chunk Worker Guide for details on chunk managment, error handling, and more.

Pro Worker Testing Module

Testing batch, workflows and chunk workers can be tricky due to how they override the perform/1 function. The new Oban.Pro.Testing module provides a process_job/2,3 function that mirrors the functionality of perform_job/2,3 in the base Oban.Testing module.

With process_job/2,3 you can test pro workers in isolation, without worrying about whether a manager is running or the state of a workflow. To start using it, import it into your test case or test file:

import Oban.Pro.Testing

Then test worker logic in isolation:

assert :ok = process_job(MyApp.ProWorker, %{"id" => 1})

Changed

  • [Oban.Pro.Plugins.WorkflowManager] This plugin is no longer necessary and can be safely removed from your configuration.

  • Bump Oban dependency to ~> 2.6

v0.6.2 — 2021-03-02

Changed

  • [Oban.Pro.Plugins.BatchManager] Drastically speed up batch state checks to prevent timeouts with large batch sizes. The updated query is up to 40x faster under normal workloads.

  • [Oban.Pro.Plugins.DynamicCron] Inject a dynamic cron's name as "cron_name" into inserted job meta to aid in searching, and eventually linking, jobs back to the dynamic cron record.

  • [Oban.Pro.Plugins.DynamicCron] Schedule cron insertion at the top of the next minute, identically to recent changes in Oban.Plugins.Cron.

  • Loosen Oban constraint to allow v2.5.0

Fixed

  • [Oban.Plugins.Reprioritizer] Rename config to conf in event meta so that it matches all other plugins.

  • Specify a minimum of Elixir v1.9 and avoid features from later Elixir versions, namely Keyword.pop! which was added in Elixir v1.10.

v0.6.1 — 2021-01-28

Fixed

  • [Oban.Pro.Plugins.Lifeline] Consider fully qualified node identities when rescuing potential orphans. The previous version could erroneously rescue executing jobs.

v0.6.0 — 2021-01-26

Changed

  • [Oban.Pro.Plugins.Lifeline] Record fully qualified node identities (combining the Oban supervisor name and the host name). This allows Oban Web to track beats recorded by the same host but different instances.

  • Enhance all plugin span metadata with details such as how many jobs were pruned, inserted, reprioritized, etc. Telemetry events and metadata for all Pro plugins now matches the events from Oban OSS.

  • Bump Oban dependency to ~> 2.4.0 due to CRON parser changes and improved telemetry span events.

Fixed

  • [Oban.Pro.Plugins.Workflow] Correct typespecs for new_workflow/1 and libgraph related functions.

  • [Oban.Pro.Plugins.DynamicCron] Support inserting and retrieving cron records with a custom prefix. All operations are wrapped in a Multi, which ignored the outer transaction's prefix.

v0.5.3 — 2020-12-09

Fixed

  • [Oban.Pro.Plugins.DynamicPruner] Consider cancelled jobs that were never attempted when pruning.

v0.5.2 — 2020-11-27

Fixed

  • [Oban.Pro.Plugins.BatchManager] Check for the correct output from Worker.from_string, fixing callback insertion.

v0.5.1 — 2020-11-06

  • [Oban.Pro.Plugins.DynamicPruner] Allow state overrides when pruning cancelled jobs.

v0.5.0 — 2020-11-06

  • [Oban.Pro.Workers.Workflow] A new worker that composes arbitrary jobs together for directed execution. This allows sequential, fan-out and fan-in workflows that are horizontally distributed and fault tolerant.

    See the Workflow docs for setup and a walk-through with use-cases.

  • Bump Oban dependency to ~> 2.3.0 due to the workflow worker's use of the new meta field in Oban.Job.

v0.4.2 — 2020-10-19

  • [Oban.Pro.Plugins.DynamicPruner] Always delete oldest job when pruning in max_len mode. In max_len mode, when job insertion exceeded job pruning the oldest jobs would persist between prunes.

v0.4.1 — 2020-10-11

Changed

v0.4.0 — 2020-10-05

Added

  • [Oban.Pro.Plugins.DynamicCron] The DynamicCron plugin is a replacement for Oban's built in cron scheduler. It supports adding, updating, deleting, pausing and resuming periodic jobs at runtime and boot time. It is an ideal solution for applications that must dynamically start and manage scheduled tasks at runtime.

    See the DynamicCron installation docs for instructions on getting started.

  • [Oban.Pro.Plugins.Lifeline] Add :retry_exhausted option, which will retry exhausted jobs rather than discard them while rescuing.

Breaking Change

  • Emit standardized [:oban, :plugin, :start | :stop | :exception] events for all plugins. Previously, each plugin would emit its own event, which required extensive :telemetry.attach_many calls. Now all plugins have a standard event name with differentiated metatada, including a :plugin key with the module name responsible for emitting the event.

    To upgrade replace any events that used :prune, :lifeline or :reprioritizer with :plugin. For example, [:oban, :prune, :start] is now [:oban, :plugin, :start] with the metadata %{plugin: DynamicPruner}.

v0.3.2 — 2020-08-28

  • [Oban.Pro.Plugins.DynamicPruner] Correctly prune discarded jobs that were cancelled before they were ever attempted.

v0.3.1 — 2020-08-05

Fixed

  • [Oban.Pro.Workers.Batch] Only invoke batch callbacks that are defined for the current worker.

v0.3.0 — 2020-07-10

Fixed

  • [Oban.Pro.Plugins.Lifeline] Ignore unknown messages to the plugin to prevent crashes.

Changed

  • Bump Oban dependency to 2.0.0

v0.2.1 — 2020-07-01

Added

  • Add handle_exhausted/1 callback to Oban.Pro.Workers.Batch worker. This new callback is triggered when all jobs are either completed or discarded.

v0.2.0 — 2020-06-12

Fixed

  • Conditional improvements to the BatchManager plugin

Changed

  • Bump Oban dependency to 2.0.0-rc.1

v0.1.0 — 2020-06-03

Initial release including these plugins:

  • Oban.Pro.Plugins.BatchManager
  • Oban.Pro.Plugins.DynamicPruner
  • Oban.Pro.Plugins.Lifeline
  • Oban.Pro.Plugins.Reprioritizer

As well as the Oban.Pro.Workers.Batch worker.