View Source Splitting Queues Between Nodes
Running every job queue on every node isn't always ideal. Imagine that your application has some CPU intensive jobs that you'd prefer not to run on nodes that serve web requests. Perhaps you start temporary nodes that are only meant to insert jobs but should never execute any. Fortunately, we can control this by configuring certain node types, or even single nodes, to run only a subset of queues.
use-case-isolating-video-processing-intensive-jobs
Use Case: Isolating Video Processing Intensive Jobs
One notorious type of CPU intensive work is video processing. When our application is transcoding multiple videos simultaneously it is a major drain on system resources and may impact response times. To avoid this we can run dedicated worker nodes that don't serve any web requests and handle all of the transcoding.
While it's possible to separate our system into web
and worker
apps within
an umbrella, that wouldn't allow us to dynamically change queues at runtime.
Let's look at an environment variable based method for dynamically
configuring queues at runtime.
Within config.exs
our application is configured to run three queues:
default
, media
and events
:
config :my_app, Oban,
repo: MyApp.Repo,
queues: [default: 15, media: 10, events: 25]
We will use an OBAN_QUEUES
environment variable to override the queues at
runtime. For illustration purposes the queue parsing all happens within the
application module, but it would work equally well in releases.exs
(or runtime.exs
).
defmodule MyApp.Application do
@moduledoc false
use Application
def start(_type, _args) do
children = [
MyApp.Repo,
MyApp.Endpoint,
{Oban, oban_opts()}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
defp oban_opts do
env_queues = System.get_env("OBAN_QUEUES")
:my_app
|> Application.get_env(Oban)
|> Keyword.update(:queues, [], &queues(env_queues, &1))
end
defp queues("*", defaults), do: defaults
defp queues(nil, defaults), do: defaults
defp queues(_, false), do: false
defp queues(values, _defaults) when is_binary(values) do
values
|> String.split(" ", trim: true)
|> Enum.map(&String.split(&1, ",", trim: true))
|> Keyword.new(fn [queue, limit] ->
{String.to_existing_atom(queue), String.to_integer(limit)}
end)
end
end
The queues
function's first three clauses ensure that we can fall back to the
queues specified in our configuration (or false
, for testing). The fourth
clause is much more involved, and that is where the environment parsing happens.
It expects the OBAN_QUEUES
value to be a string formatted as queue,limit
pairs and separated by spaces. For example, to run only the default
and
media
queues with a limit of 5 and 10 respectively, you would pass the string
default,5 media,10
.
Note that the parsing clause has a couple of safety mechanisms to ensure that only real queues are specified:
- It automatically trims while splitting values, so extra whitespace like won't
break parsing (i.e.
default,3
) - It only converts the
queue
string to an existing atom, hopefully preventing typos that would start a random queue (i.e.defalt
)
usage-examples
Usage Examples
In development (or when using mix
rather than releases) we can specify the
environment variable inline:
OBAN_QUEUES="default,10 media,5" mix phx.server # default: 10, media: 5
We can also explicitly opt in to running all of the configured queues:
OBAN_QUEUES="*" mix phx.server # default: 15, media: 10, events: 25
Finally, without OBAN_QUEUES
set at all it will implicitly fall back to the
configured queues:
mix phx.server # default: 15, media: 10, events: 25
flexible-across-all-environments
Flexible Across all Environments
This environment variable based solution is more flexible than running separate umbrella apps because we can reconfigure at any time. In a limited environment, like staging, we can run all the queues on a single node using the exact same code we use in production. In the future, if other workers start to utilize too much CPU or RAM we can shift them to the worker node without any code changes.
This guide was prompted by an inquiry on the Oban issue tracker.