View Source Smart Engine

🌟 This engine is available through Oban.Pro

The SmartEngine enables truly global concurrency and global rate limiting by handling demand at the queue level, without any conflict or churn. The engine uses centralized producer records to coordinate with minimal load on the database.

Engines are callback modules that coordinate how queue producers handle demand and interact the database.

installation

Installation

The SmartEngine relies on producer records, stored in the oban_producers table. To start, create a migration to create an oban_producers table:

$ mix ecto.gen.migration add_oban_producers

Within the migration module:

use Ecto.Migration

defdelegate change, to: Oban.Pro.Migrations.Producers

If you have multiple Oban instances or use prefixes, you can specify the prefix and create multiple tables in one migration:

use Ecto.Migration

def change do
  Oban.Pro.Migrations.Producers.change()
  Oban.Pro.Migrations.Producers.change(prefix: "special")
  Oban.Pro.Migrations.Producers.change(prefix: "private")
end

The Producers migration also exposes up/0 and down/0 functions if change/0 doesn't fit your usecase.

Next, update your config to use the SmartEngine:

config :my_app, Oban,
  engine: Oban.Pro.Queue.SmartEngine,
  ...

If you have multiple Oban instances you'll need to configure each one to use the SmartEngine, otherwise they'll default to the BasicEngine.

usage-and-configuration

Usage and Configuration

With the SmartEngine in place you can define global concurrency limits and rate limits at the queue level. The engine supports the basic queue: limit format as well as the expanded format. Let's look at a few examples:

queues: [
  alpha: 1,
  gamma: [global_limit: 1],
  delta: [local_limit: 2, global_limit: 5],
  kappa: [local_limit: 5, rate_limit: [allowed: 30, period: {1, :minute}]],
  omega: [global_limit: 1, rate_limit: [allowed: 500, period: {1, :hour}]]
]
  • local_limit — This limits the number of concurrent jobs that run within a single node. Typically the global concurrency is local_limit * num_nodes. For example, with three nodes and a local limit of 10, you'll have a global limit of 30.

  • global_limit — This limits the number of concurrent jobs that run across all nodes. The queue will process at most the local limit or global limit, whichever is lower. The only way to guarantee that all connected nodes will run exactly one job concurrently is to set global_limit: 1.

  • rate_limit — This limits the number of jobs that are processed within a sliding window of time, across all nodes. Rate limiting uses counts from all other producer records for the same queue in the cluster. The limiter uses a sliding window over the configured period to accurately approximate a limit.

Without a modifier the rate_limit period is defined in seconds. You can provide a :second, :minute, :hour or :day modifier to use more intuitive values. Here are a few examples:

  • period: 30 — 30 seconds
  • period: {1, :minute} — 60 seconds
  • period: {2, :minutes} — 120 seconds
  • period: {1, :hour} — 3,600 seconds
  • period: {1, :day} —86,400 seconds

All limits stack and the queue will only fetch the minimum demand.

partitioned-rate-limiting

Partitioned Rate-Limiting

In addition to queue-wide rate limiting, you can also rate-limit by worker and/or args within a queue. This is called "partitioned" rate-limiting, and it's configured through the partition option.

For example, to allow one job per-worker, every five seconds, across every instance of the alpha queue in your cluster:

queues: [
  alpha: [
    local_limit: 10,
    rate_limit: [allowed: 1, period: 5, partition: [fields: [:worker]]]
  ]
]

The following example partitions by the user_id of args instead:

rate_limit: [allowed: 1, period: 5, partition: [fields: [:args], keys: [:user_id]]

Here are a few more examples of viable partitioning schemes:

  • fields: [:args] — partition by the entire args map, not a few select keys

  • fields: [:worker, :args] — partition by both worker and all args

  • fields: [:args], keys: [:id, :account_id] — partition by the id and account_ids keys

implementation-notes

Implementation Notes

  • The DynamicLifeline plugin cleans up producers that haven't been updated for 1 minute or more. When an event doesn't have any jobs to produce it will automatically "touch" the producer record to prevent deletion.

  • Queue state is stored in a producer record's meta column in the database, where you can easily check values such as the current rate limit counts.

  • Rate limiting uses an efficient sliding window algorithm to safely handle large rate periods, e.g. multiple hours or days.

  • Partitioned rate limiting tracks each partition separately as a "window" within the producer record. Minimize partition cardinality by using keys whenever possible.