ObanDoctor.Plugins.QueueMetrics (oban_doctor v0.2.2)

Copy Markdown View Source

Monitors queue depth and job counts across Oban queues.

As Oban Plugin

Add to your Oban config for scheduled telemetry emission:

config :my_app, Oban,
  plugins: [
    {ObanDoctor.Plugins.QueueMetrics, interval: :timer.minutes(5)}
  ]

The plugin automatically uses your Oban instance's configuration (repo, prefix, queues, limits). Only the leader node (via Oban.Peer) executes the scheduled check, ensuring metrics are emitted once across your cluster.

Plugin Options

  • :interval - How often to emit telemetry in ms (default: 5 minutes)
  • :queues - Filter to specific queues (default: all configured queues)

Telemetry

Emits [:oban_doctor, :queue, :metrics] with the same data as metrics/2:

:telemetry.attach("my-handler", [:oban_doctor, :queue, :metrics], fn _event, measurements, metadata, _config ->
  # measurements = %{queue_count: 2}
  # metadata = %{
  #   oban_name: Oban,
  #   metrics: %{
  #     queues: [%{queue: :default, available: 5, executing: 2, limit: 10, ...}, ...],
  #     total: %{available: 10, executing: 3, ...}
  #   }
  # }
end, nil)

Standalone Usage

Query queue metrics directly without configuring the plugin:

ObanDoctor.Plugins.QueueMetrics.metrics(MyApp.Repo, oban: Oban)
#=> %{
#     queues: [
#       %{queue: :default, available: 5, executing: 2, limit: 10, utilization_pct: 20.0, ...}
#     ],
#     total: %{available: 5, executing: 2, ...}
#   }

# Filter to specific queues
ObanDoctor.Plugins.QueueMetrics.metrics(MyApp.Repo, oban: Oban, queues: [:default])

Metrics

MetricDescription
availableJobs ready to execute immediately
scheduledJobs scheduled for future execution
executingJobs currently being processed
retryableFailed jobs waiting for retry
completedSuccessfully completed jobs
discardedJobs that exceeded max attempts
cancelledManually cancelled jobs
limitConfigured concurrency limit for the queue
utilization_pctCurrent utilization: executing / limit * 100

Capacity Planning

Use utilization_pct to identify queues that need scaling:

metrics = ObanDoctor.Plugins.QueueMetrics.metrics(MyApp.Repo, oban: Oban)

metrics.queues
|> Enum.filter(& &1.utilization_pct > 80)
|> Enum.each(fn queue ->
  Logger.warning("Queue #{queue.queue} at #{queue.utilization_pct}% utilization")
end)

Summary

Functions

Query current queue metrics from the database.

Types

option()

@type option() ::
  Oban.Plugin.option() | {:interval, pos_integer()} | {:queues, [atom()]}

Functions

metrics(repo, opts \\ [])

@spec metrics(
  module(),
  keyword()
) :: map()

Query current queue metrics from the database.

Returns a map with queue statistics including job counts by state.

Options

  • :oban - Oban instance name to get config from (prefix, queues, limits)
  • :conf - Oban config struct (when called from a plugin)
  • :queues - List of queue names to filter to (default: all from config)

The prefix, queues, and limits are automatically extracted from the Oban configuration via :oban or :conf.

Return Value

Returns a map with :queues (list of per-queue stats) and :total (aggregated counts).

Per-queue stats include:

  • :queue - Queue name as atom
  • :available, :scheduled, :executing, :retryable, :completed, :discarded, :cancelled - Job counts by state
  • :limit - Configured concurrency limit (when config available)
  • :utilization_pct - Current utilization as percentage: executing / limit * 100 (when limit available)

Examples

# Use running Oban instance config (prefix, queues, limits)
iex> QueueMetrics.metrics(MyApp.Repo, oban: Oban)
%{
  queues: [
    %{queue: :default, available: 5, scheduled: 10, executing: 2, limit: 10, utilization_pct: 20.0, ...},
    %{queue: :mailers, available: 0, scheduled: 3, executing: 1, limit: 5, utilization_pct: 20.0, ...}
  ],
  total: %{available: 5, scheduled: 13, executing: 3, ...}
}

# Filter to specific queues
iex> QueueMetrics.metrics(MyApp.Repo, oban: Oban, queues: [:default])
%{queues: [%{queue: :default, ...}], total: ...}