Changelog View Source
All notable changes to Oban.Pro
are documented here.
v0.9.4 — 2021-12-19
[BatchManager] Consider all states for batch callback uniquness.
Previously, if a callback failed enough it could be discarded and not considered for subsequent uniquness checks.
v0.9.3 — 2021-11-19
Added
- [DynamicPruner] Support
:infinity
as duration in dynamic pruning so users don't have to specify ludicrous values like{999, :years}
.
Fixed
[Relay] Attach the
Relay
telemetry handler using module function capture syntax to prevent warnings.[DynamicCron] Include
:expression
as an available update option to prevent dialyzer errors.[SmartEngine] Preserve existing rate limit fields when scaling or otherwise changing a producer's
meta
values.[SmartEngine] Ensure that the total fetch demand is never negative.
When running queues are converted to global mode there may be a defecit between the total jobs and the global limit. In that case we must fetch
0
jobs rather than passing a negative number
v0.9.2 — 2021-10-13
Fixed
[SmartEngine] Verify the presence of a rate-limit period before calculating window time. This fixes situations where a producer record for the same queue existed, but lacked a rate limit structure.
[Producer] Improve legacy Ecto support by lazily calculating the local limit, without assuming the params are coerced into a map.
v0.9.1 — 2021-09-26
Fixed
- [SmartEngine] Respect configured log level when performing transaction lock
queries. Previously, the Smart Engine could log every time it made an advisory
lock query with a
global_limit
orrate_limit
set.
v0.9.0 — 2021-09-15
Partitioned Rate Limiting
Driven by popular demand, v0.9 brings partitions to the Smart Engine's rate limiter. With partitions, rate limits are applied per-worker, on args, or on a subset of args fields rather than across the entire queue. This enables your application to enforce limits per-customer or respect external throttling, without splitting jobs into multiple queues.
rate_limit: [allowed: 100, period: 5, partition: [fields: [:worker]]
Alternatively, you can partition by the job's account_id
field:
rate_limit: [allowed: 100, period: 5, partition: [fields: [:args], keys: [:account_id]]]
Naturally, a combination of worker
, args
, and any number of keys
works.
Check out the SmartEngine Guide for options and details.
Batch Worker Improvements
Batches are the oldest worker in Pro, and as such, they existed prior to the
meta
field. Finally, that difference is rectified. Building batches on meta
enables a handful of ergonomic improvements and new functionality:
- Forwarding
args
through thebatch_callback_args
option - Heterogeneous batches with an alternative callback module through
the
batch_callback_worker
option - Fetch all jobs in a batch with
stream_batch_jobs/2
for map/reduce processing
Check out the updated Batch Guide for more examples, configuration options, use cases, and typespecs.
IMPORTANT: For in-flight batch callback jobs to run after an upgrade you'll
need to migrate batch_id
and callback
into meta
. Run the following SQL in
a migration immediately prior to, or following, the upgrade.
UPDATE oban_jobs
SET meta = jsonb_set_lax(
jsonb_set(meta, '{batch_id}', args->'batch_id'),
'{callback}',
args->'callback',
true,
'return_target'
)
WHERE state NOT IN ('cancelled', 'completed', 'discarded')
AND args ? 'batch_id'
Workflow Worker Improvements
Workflows got a little ergonomic love, too. Now you can dynamically extend
workflows at runtime with the new append_workflow
function:
def process(%Job{} = job) do
jobs =
job
|> append_workflow(check_deps: false)
|> add(:d, WorkerD.new(%{}), deps: [:a])
|> add(:e, WorkerE.new(%{}), deps: [:b])
|> add(:f, WorkerF.new(%{}), deps: [:c])
|> Oban.insert_all()
{:ok, jobs}
end
Note the use of check_deps: false
to prevent dependency validation. To be safe
and check jobs while appending, we'll use the new stream_workflow_jobs/1
function to load all of the previous jobs and feed them in:
def process(%Job{} = job) do
{:ok, jobs} =
MyApp.Repo.transaction(fn ->
job
|> stream_workflow_jobs()
|> Enum.to_list()
end)
jobs
|> append_workflow()
|> add(:d, WorkerD.new(%{}), deps: [:a])
|> add(:e, WorkerE.new(%{}), deps: [:b])
|> add(:f, WorkerF.new(%{}), deps: [:c])
|> Oban.insert_all()
:ok
end
Check out the updated Workflow Guide for details on appending to workflows, job streaming, and complete typespecs.
Changed
[Oban] Require Oban
~> v2.9
to support the newcancel_all_jobs
engine callback.[Oban.Pro.Queue.SmartEngine] Make operations like
refresh
more efficient (less data transport) and more failure tolerant to prevent producer crashes.
v0.8.2 — 2021-07-30
Fixed
[Workflow] Ensure a minimum snooze when all deps are scheduled.
When an upstream dependency was scheduled to run right now, or sometime in the past, it would return an invalid value like
{:snooze, 0}
. In that case the result was considered a success and the job wouldn't run as expected.[DynamicCron] Prevent crashing when a cron's worker module is missing. This could happen during rolling or blue/green deploys.
[PG] Correct
ident
check when receiving scoped messages for pause, resume, or scale type actions.
v0.8.1 — 2021-06-01
Fixed
[SmartEngine] Correctly track the rate limit window between days.
Rate limited queues could get stuck when fetch invocations straddled midnight. In that scenario the queue wouldn't start fetching more until the clock time for the current day exceeded the last recorded time from the previous day.
[SmartEngine] Silence operations in multi transactions. The
log
level wasn't respected for some frequently used operations.[Relay] Remove "local" optimization that could incorrectly prevent results from getting back to the calling node.
v0.8.0 — 2021-05-25
Added
[PG] An alternate
Oban.Notifier
implementation based onpg/pg2
for use with Distributed Erlang.[DynamicCron] Store timezone overrides for individual cron entries.
Along with the a timezone for the DynamicCron plugin, individual cron entries now support a timezone override. The timezone can be set when inserting or updating cron entries.
Changed
- [Relay] Decode relay results using the
:safe
option to prevent possible atom table exhaustion or remote code injection.
Fixed
- [SmartEngine] Implementing a
cancel_job/2
callback within the engine makes cancelling jobs safe. Previously cancelling a job could leave the job id in the list of running jobs, which could eventually exhaust the concurrency limit and prevent a queue from running new jobs.
v0.7.3 — 2021-05-19
Changed
[SmartEngine] Use database connection safety for all SmartEngine calls.
Database connections may be unstable for many reasons: pool contention, database overload, crashes. We shouldn't crash the producer due to a flickering database call.
Fixed
- [DynamicCron] Explicitly declare cron timestamps as
DateTime
to compensate for apps that configureEcto
to useNaiveDateTime
by default.
v0.7.2 — 2021-04-11
Changed
[Lifeline] The plugin will only run along with the SmartEngine. Running with the BasicEngine is flawed and will cause inadvertent orphan rescues.
You can either switch to the SmartEngine or remove Lifeline from your plugins.
Fixed
- [SmartEngine] Consider the configured prefix for all ack and update options.
v0.7.1 — 2021-04-07
Changed
[Relay] Restore the ability to use Elixir 1.9 by removing use of the
is_map_key
guard that wasn't introduced until Elixir 1.10.[SmartEngine] Apply jitter and more selective use of the mutex used for global coordination. The engine also avoids touching or reloading producer records to minimize connection use.
[Workflow] Improve deps check performance by using an index powered operator. On instances with millions of jobs this may improve performance ~600x.
v0.7.0 — 2021-04-02
Pluggable Smart Engine
Pro's SmartEngine
enables truly global concurrency and global rate limiting by
handling demand at the queue level, without any conflict or churn. The engine
uses centralized producer records to coordinate with minimal load on the
database.
config :my_app, Oban,
engine: Oban.Pro.Queue.SmartEngine,
queues: [
alpha: 1,
gamma: [global_limit: 1],
delta: [local_limit: 2, global_limit: 5],
kappa: [local_limit: 5, rate_limit: [allowed: 30, period: {1, :minute}]],
omega: [global_limit: 1, rate_limit: [allowed: 500, period: {1, :hour}]]
],
...
Check out the SmartEngine Guide for a walkthrough and details.
Relay Plugin
The Relay plugin lets you insert and await the results of jobs locally or remotely, across any number of nodes, i.e. persistent distributed tasks. Once the plugin is running, you can seamlessly distribute oban jobs and await the results synchronously:
alias Oban.Pro.Plugins.Relay
1..3
|> Enum.map(&DoubleWorker.new(%{int: &1}))
|> Enum.map(&Relay.async/1)
|> Relay.await_many(timeout: :timer.seconds(1))
# [{:ok, 2}, {:ok, 4}, {:ok, 6}]
See the Relay Plugin Guide for usage and details.
Chunk Worker
Process jobs "broadway style", in groups based on size or a timeout, but with the robust error handling semantics of Oban. Chunking operates at the worker level, allowing many chunks to run in parallel within the same queue.
defmodule MyApp.ChunkWorker do
use Oban.Pro.Workers.Chunk, queue: :alpha, size: 10, timeout: :timer.seconds(5)
@impl Chunk
def process(jobs) do
jobs
|> Enum.map(& &1.args)
|> Business.process_messages()
:ok
end
end
See the Chunk Worker Guide for details on chunk managment, error handling, and more.
Pro Worker Testing Module
Testing batch, workflows and chunk workers can be tricky due to how they
override the perform/1
function. The new Oban.Pro.Testing
module provides a
process_job/2,3
function that mirrors the functionality of perform_job/2,3
in the base Oban.Testing
module.
With process_job/2,3
you can test pro workers in isolation, without worrying
about whether a manager is running or the state of a workflow. To start using
it, import it into your test case or test file:
import Oban.Pro.Testing
Then test worker logic in isolation:
assert :ok = process_job(MyApp.ProWorker, %{"id" => 1})
Changed
[Oban.Pro.Plugins.WorkflowManager] This plugin is no longer necessary and can be safely removed from your configuration.
Bump Oban dependency to
~> 2.6
v0.6.2 — 2021-03-02
Changed
[Oban.Pro.Plugins.BatchManager] Drastically speed up batch state checks to prevent timeouts with large batch sizes. The updated query is up to 40x faster under normal workloads.
[Oban.Pro.Plugins.DynamicCron] Inject a dynamic cron's name as
"cron_name"
into inserted job meta to aid in searching, and eventually linking, jobs back to the dynamic cron record.[Oban.Pro.Plugins.DynamicCron] Schedule cron insertion at the top of the next minute, identically to recent changes in
Oban.Plugins.Cron
.Loosen Oban constraint to allow
v2.5.0
Fixed
[Oban.Plugins.Reprioritizer] Rename
config
toconf
in event meta so that it matches all other plugins.Specify a minimum of Elixir v1.9 and avoid features from later Elixir versions, namely
Keyword.pop!
which was added in Elixir v1.10.
v0.6.1 — 2021-01-28
Fixed
- [Oban.Pro.Plugins.Lifeline] Consider fully qualified node identities when rescuing potential orphans. The previous version could erroneously rescue executing jobs.
v0.6.0 — 2021-01-26
Changed
[Oban.Pro.Plugins.Lifeline] Record fully qualified node identities (combining the Oban supervisor name and the host name). This allows Oban Web to track beats recorded by the same host but different instances.
Enhance all plugin span metadata with details such as how many jobs were pruned, inserted, reprioritized, etc. Telemetry events and metadata for all Pro plugins now matches the events from Oban OSS.
Bump Oban dependency to
~> 2.4.0
due to CRON parser changes and improved telemetry span events.
Fixed
[Oban.Pro.Plugins.Workflow] Correct typespecs for
new_workflow/1
andlibgraph
related functions.[Oban.Pro.Plugins.DynamicCron] Support inserting and retrieving cron records with a custom prefix. All operations are wrapped in a Multi, which ignored the outer transaction's prefix.
v0.5.3 — 2020-12-09
Fixed
- [Oban.Pro.Plugins.DynamicPruner] Consider
cancelled
jobs that were never attempted when pruning.
v0.5.2 — 2020-11-27
Fixed
- [Oban.Pro.Plugins.BatchManager] Check for the correct output from
Worker.from_string
, fixing callback insertion.
v0.5.1 — 2020-11-06
- [Oban.Pro.Plugins.DynamicPruner] Allow state overrides when pruning
cancelled
jobs.
v0.5.0 — 2020-11-06
[Oban.Pro.Workers.Workflow] A new worker that composes arbitrary jobs together for directed execution. This allows sequential, fan-out and fan-in workflows that are horizontally distributed and fault tolerant.
See the Workflow docs for setup and a walk-through with use-cases.
Bump Oban dependency to
~> 2.3.0
due to the workflow worker's use of the newmeta
field inOban.Job
.
v0.4.2 — 2020-10-19
- [Oban.Pro.Plugins.DynamicPruner] Always delete oldest job when pruning in
max_len
mode. Inmax_len
mode, when job insertion exceeded job pruning the oldest jobs would persist between prunes.
v0.4.1 — 2020-10-11
Changed
- Upgrade Oban dependency to
~> 2.2.0
along with fixes for the move toOban.Registry
andOban.Repo
.
v0.4.0 — 2020-10-05
Added
[Oban.Pro.Plugins.DynamicCron] The
DynamicCron
plugin is a replacement for Oban's built in cron scheduler. It supports adding, updating, deleting, pausing and resuming periodic jobs at runtime and boot time. It is an ideal solution for applications that must dynamically start and manage scheduled tasks at runtime.See the DynamicCron installation docs for instructions on getting started.
[Oban.Pro.Plugins.Lifeline] Add
:retry_exhausted
option, which will retry exhausted jobs rather than discard them while rescuing.
Breaking Change
Emit standardized
[:oban, :plugin, :start | :stop | :exception]
events for all plugins. Previously, each plugin would emit its own event, which required extensive:telemetry.attach_many
calls. Now all plugins have a standard event name with differentiated metatada, including a:plugin
key with the module name responsible for emitting the event.To upgrade replace any events that used
:prune
,:lifeline
or:reprioritizer
with:plugin
. For example,[:oban, :prune, :start]
is now[:oban, :plugin, :start]
with the metadata%{plugin: DynamicPruner}
.
v0.3.2 — 2020-08-28
- [Oban.Pro.Plugins.DynamicPruner] Correctly prune
discarded
jobs that were cancelled before they were ever attempted.
v0.3.1 — 2020-08-05
Fixed
- [Oban.Pro.Workers.Batch] Only invoke batch callbacks that are defined for the current worker.
v0.3.0 — 2020-07-10
Fixed
- [Oban.Pro.Plugins.Lifeline] Ignore unknown messages to the plugin to prevent crashes.
Changed
- Bump Oban dependency to
2.0.0
v0.2.1 — 2020-07-01
Added
- Add
handle_exhausted/1
callback toOban.Pro.Workers.Batch
worker. This new callback is triggered when all jobs are eithercompleted
ordiscarded
.
v0.2.0 — 2020-06-12
Fixed
- Conditional improvements to the
BatchManager
plugin
Changed
- Bump Oban dependency to
2.0.0-rc.1
v0.1.0 — 2020-06-03
Initial release including these plugins:
Oban.Pro.Plugins.BatchManager
Oban.Pro.Plugins.DynamicPruner
Oban.Pro.Plugins.Lifeline
Oban.Pro.Plugins.Reprioritizer
As well as the Oban.Pro.Workers.Batch
worker.