View Source Smart Engine
🌟 This engine is available through Oban.Pro
The SmartEngine
enables truly global concurrency and global rate limiting by
handling demand at the queue level, without any conflict or churn. The engine
uses centralized producer records to coordinate with minimal load on the
database.
Engines are callback modules that coordinate how queue producers handle demand and interact the database.
installation
Installation
The SmartEngine
relies on producer records, stored in the oban_producers
table. To start, create a migration to create an oban_producers
table:
$ mix ecto.gen.migration add_oban_producers
Within the migration module:
use Ecto.Migration
defdelegate change, to: Oban.Pro.Migrations.Producers
If you have multiple Oban instances or use prefixes, you can specify the prefix and create multiple tables in one migration:
use Ecto.Migration
def change do
Oban.Pro.Migrations.Producers.change()
Oban.Pro.Migrations.Producers.change(prefix: "special")
Oban.Pro.Migrations.Producers.change(prefix: "private")
end
The Producers
migration also exposes up/0
and down/0
functions if
change/0
doesn't fit your usecase.
Next, update your config to use the SmartEngine
:
config :my_app, Oban,
engine: Oban.Pro.Queue.SmartEngine,
...
If you have multiple Oban instances you'll need to configure each one to use the
SmartEngine
, otherwise they'll default to the BasicEngine
.
usage-and-configuration
Usage and Configuration
With the SmartEngine
in place you can define global concurrency limits and
rate limits at the queue level. The engine supports the basic queue: limit
format as well as the expanded format. Let's look at a few examples:
queues: [
alpha: 1,
gamma: [global_limit: 1],
delta: [local_limit: 2, global_limit: 5],
kappa: [local_limit: 5, rate_limit: [allowed: 30, period: {1, :minute}]],
omega: [global_limit: 1, rate_limit: [allowed: 500, period: {1, :hour}]]
]
local_limit
— This limits the number of concurrent jobs that run within a single node. Typically the global concurrency islocal_limit * num_nodes
. For example, with three nodes and a local limit of 10, you'll have a global limit of 30.global_limit
— This limits the number of concurrent jobs that run across all nodes. The queue will process at most the local limit or global limit, whichever is lower. The only way to guarantee that all connected nodes will run exactly one job concurrently is to setglobal_limit: 1
.rate_limit
— This limits the number of jobs that are processed within a sliding window of time, across all nodes. Rate limiting uses counts from all other producer records for the same queue in the cluster. The limiter uses a sliding window over the configured period to accurately approximate a limit.
Without a modifier the rate_limit
period is defined in seconds. You can
provide a :second
, :minute
, :hour
or :day
modifier to use more intuitive
values. Here are a few examples:
period: 30
— 30 secondsperiod: {1, :minute}
— 60 secondsperiod: {2, :minutes}
— 120 secondsperiod: {1, :hour}
— 3,600 secondsperiod: {1, :day}
—86,400 seconds
All limits stack and the queue will only fetch the minimum demand.
partitioned-rate-limiting
Partitioned Rate-Limiting
In addition to queue-wide rate limiting, you can also rate-limit by worker
and/or args
within a queue. This is called "partitioned" rate-limiting, and
it's configured through the partition
option.
For example, to allow one job per-worker, every five seconds, across every
instance of the alpha
queue in your cluster:
queues: [
alpha: [
local_limit: 10,
rate_limit: [allowed: 1, period: 5, partition: [fields: [:worker]]]
]
]
The following example partitions by the user_id
of args
instead:
rate_limit: [allowed: 1, period: 5, partition: [fields: [:args], keys: [:user_id]]
Here are a few more examples of viable partitioning schemes:
fields: [:args]
— partition by the entire args map, not a few select keysfields: [:worker, :args]
— partition by both worker and all argsfields: [:args], keys: [:id, :account_id]
— partition by theid
andaccount_ids
keys
implementation-notes
Implementation Notes
The
DynamicLifeline
plugin cleans up producers that haven't been updated for 1 minute or more. When an event doesn't have any jobs to produce it will automatically "touch" the producer record to prevent deletion.Queue state is stored in a producer record's
meta
column in the database, where you can easily check values such as the current rate limit counts.Rate limiting uses an efficient sliding window algorithm to safely handle large rate periods, e.g. multiple hours or days.
Partitioned rate limiting tracks each partition separately as a "window" within the producer record. Minimize partition cardinality by using
keys
whenever possible.