Dynamic Cron Plugin

🌟 This plugin is available through Oban.Pro

The DynamicCron plugin enhances Oban's built in cron scheduler by making it configurable at runtime, globally, across your entire cluster. DynamicCron supports adding, updating, deleting, and pausing cron entries at boot time or runtime. It is an ideal solution for applications that must dynamically start and manage scheduled tasks at runtime.

Installation

Before running the DynamicCron plugin you must run a migration to add the oban_cron table to your database.

mix ecto.gen.migration add_oban_cron

Open the generated migration in your editor and call the change function on Oban.Pro.Migrations.DynamicCron:

defmodule MyApp.Repo.Migrations.AddObanCron do
  use Ecto.Migration

  defdelegate change, to: Oban.Pro.Migrations.DynamicCron
end

As with the base Oban tables you can optionally provide a prefix to "namespace" the table within your database. Here we specify a "private" prefix:

defmodule MyApp.Repo.Migrations.AddObanCron do
  use Ecto.Migration

  def change, do: Oban.Pro.Migrations.DynamicCron.change("private")
end

Run the migration to create the table:

mix ecto.migrate

Now we can use the DynamicCron plugin and start scheduling periodic jobs!

Using and Configuring

To begin using DynamicCron, add the module to your list of Oban plugins in config.exs:

config :my_app, Oban,
  plugins: [Oban.Pro.Plugins.DynamicCron]
  ...

By itself, without providing a crontab or dynamically inserting cron entries, the plugin doesn't have anything to schedule. To get scheduling started, provide a list of {cron, worker} or {cron, worker, options} tuples to the plugin. The syntax is identical to Oban's built in :crontab option, which means you can copying an existing standard :crontab list into the plugin's :crontab.

plugins: [{
  Oban.Pro.Plugins.DynamicCron,
  timezone: "America/Chicago",
  crontab: [
    {"* * * * *", MyApp.MinuteJob},
    {"0 * * * *", MyApp.HourlyJob, queue: :scheduled},
    {"0 0 * * *", MyApp.DailyJob, max_attempts: 1},
    {"0 12 * * MON", MyApp.MondayWorker, tags: ["scheduled"]},
    {"@daily", MyApp.AnotherDailyWorker}
  ]
}]

For more details about periodic jobs and cron expressions see the documentation on Periodic Jobs.

Now, when the dynamic pruner initializes, it will persist those cron entries to the database and start scheduling them according to their CRON expression. The plugin's crontab format is nearly identical to Oban's standard crontab, with a few important enhancements we'll look at soon.

Each of the crontab entries are persisted to the database and referenced globally, by all the other connected Oban instances. That allows us to insert, update, or delete cron entries at any time. In fact, changing the schedule or options of an entry in the crontab provided to the plugin will automatically update the persisted entry. To demonstrate, let's modify the MinuteJob we specified so that it runs every other minute in the :scheduled queue:

crontab: [
  {"*/2 * * * *", MyApp.MinuteJob, name: "frequent", queue: :scheduled},
  ...
]

Now it isn't really a "minute job" any more, and the name is no longer suitable. That is why we added a :name override, so that we could update the name of the worker module as well.

crontab: [
  {"*/2 * * * *", MyApp.FrequentJob, name: "frequent", queue: :scheduled},
  ...
]

All entries are referenced by name, which defaults to the worker's name and must be unique. You may define the same worker multiple times as long as you provide a name override:

crontab: [
  {"*/3 * * * *", MyApp.BasicJob, name: "client-1", args: %{client_id: 1}},
  {"*/3 * * * *", MyApp.BasicJob, name: "client-2", args: %{client_id: 2}},
  ...
]

To temporarily disable scheduling jobs you can set the paused flag:

crontab: [
  {"* * * * *", MyApp.BasicJob, paused: true},
  ...
]

To resume the job you must supply paused: false (or use update/2 to resume it manually), simply removing the paused option will have no effect.

crontab: [
  {"* * * * *", MyApp.BasicJob, paused: false},
  ...
]

It is also possible to delete a persisted entry during initialization by passing the :delete option:

crontab: [
  {"* * * * *", MyApp.MinuteJob, delete: true},
  ...
]

One or more entries can be deleted this way. Deleting entries is idempotent, nothing will happen if no matching entry can be found.

You can also use :delete to rename entries, if necessary:

crontab: [
  {"* * * * *", MyApp.FrequentJob, name: "frequent", delete: true},
  {"* * * * *", MyApp.FrequentJob, name: "frequently"},
  ...
]

However, it is recommended that you use the update/2 function to rename entries instead.

In the next section we'll look at how to list, insert, update and delete jobs dynamically at runtime.

Runtime Updates

Dynamic cron entries are persisted to the database, making it easy to manipulate them through typical CRUD operations. The DynamicCron plugin provides convenience functions to simplify working those operations. In this section we'll walk through each of the available functions and look at some examples.

Typespecs

📚 In order to bridge the gap between module level docs and a guide, each section includes a typespec for the corresponding function. The snippet below defines the types listed in each section.

@type cron_expr :: binary()
@type cron_name :: binary() | atom()
@type cron_opt ::
        {:args, Oban.Job.args()}
        | {:max_attempts, pos_integer()}
        | {:paused, boolean()}
        | {:priority, 0..3}
        | {:name, cron_name()}
        | {:queue, atom() | binary()}
        | {:tags, Oban.Job.tags()}
        | {:worker, module()}
@type cron_input :: {cron_expr(), module()} | {cron_expr(), module(), [cron_opt]}

Inserting Entries

@spec insert([cron_input()]) :: {:ok, [CronEntry.t()]} | {:error, Ecto.Changeset.t()}

The insert/1 function takes a list of one or more tuples with the same {expression, worker} or {expression, worker, options} format as the plugin's crontab option:

DynamicCron.insert([
  {"0 0 * * *", MyApp.GenericWorker},
  {"* * * * *", MyApp.ClientWorker, name: "client-1", args: %{client_id: 1}},
  {"* * * * *", MyApp.ClientWorker, name: "client-2", args: %{client_id: 2}},
  {"* * * * *", MyApp.ClientWorker, name: "client-3", args: %{client_id: 3}}
])

Be aware that insert/1 acts like an "upsert", making it possible to modify existing entries if the worker or name matches. Still, it is better to use update/2 to make targeted updates.

Updating Entries

@spec update(cron_name(), [cron_opt()])) :: {:ok, CronEntry.t()} | {:error, Ecto.Changeset.t()}

The update/2 function updates a single cron entry, as identified by the worker or name. Any option available when specifying an entry in the crontab list or when calling insert/2 can be updated—that includes the cron expression and the worker.

The following call demonstrates updating every possible option:

{:ok, _} =
  DynamicCron.update(
    "cron-1",
    expression: "1 * * * *",
    max_attempts: 10,
    name: "special-cron",
    paused: false,
    priority: 0,
    queue: "dedicated",
    tags: ["client", "scheduled"],
    worker: Other.Worker
  )

Naturally, individual options may be updated instead. For example, set paused: true to pause an entry:

{:ok, _} = DynamicCron.update(MyApp.ClientWorker, paused: true)

Since update/2 operates on a single entry at a time, it is possible to rename an entry without doing the delete/insert dance:

# With the worker as the entry name
{:ok, _} = DynamicCron.update(MyApp.ClientWorker, name: "client-worker")

# With a custom entry name already set
{:ok, _} = DynamicCron.update("cron-1", name: "special-cron")

Deleting Entries

@spec delete(cron_name()) :: {:ok, CronEntry.t()} | {:error, Ecto.Changeset.t()}

The delete/1 function operates on individual entries, by worker or name. You can use it to delete entries at runtime, rather than hard-coding the :delete flag into the crontab list at compile time.

# With the worker as the entry name
{:ok, _} = DynamicCron.delete(Worker)

# With a custom name
{:ok, _} = DynamicCron.delete("cron-1")

Listing Entries

Use all/0 to retrieve all persisted cron entries:

entries = DynamicCron.all()

This returns a list of Oban.Pro.Cron schemas with raw attributes. The all/0 function is provided as a convenience to inspect persisted entries.

As Oban.Pro.Cron is an Ecto schema you're free to query the table however you wish using Ecto.Query. For example, you can list all of the entries with a name like "client-":

import Ecto.Query, only: [where: 3]

Oban.Pro.Cron
|> where([c], ilike(c.name, "client-%"))
|> MyApp.Repo.all()

You can use functions like update or update_all to modify cron jobs in place, but it is highly recommended that you use update/2 to ensure that options are set correctly and to prevent breakage.

Isolation and Namespacing

All DynamicCron functions have an alternate clause that accepts an Oban instance name as the first argument. This is in line with base Oban functions such as Oban.insert/2, which allow you to seamlessly work with multiple Oban instances and across multiple database prefixes. For example, you can use list/1 to list all cron entries for the instance named ObanPrivate:

entries = DynamicCron.all(ObanPrivate)

Likewise, to insert a new entry using the configuration associated with the ObanPrivate instance:

{:ok, _} = DynamicCron.insert(ObanPrivate, [{"* * * * *", PrivateWorker}])

Instrumenting with Telemetry

The DynamicCron plugin uses Oban.Telemetry.span/3 to emit standardized plugin events.

eventmetadata
[:oban, :plugin, :start]:plugin
[:oban, :plugin, :stop]:plugin, :duration
[:oban, :plugin, :exception]:plugin, :duration, :kind, :reason, :stacktrace