Oban.Met (Oban Met v1.0.3)

View Source

Met is a distributed, compacting, multidimensional, telemetry-powered time series datastore for Oban that requires no configuration. It gathers data for queues, job counts, execution metrics, active crontabs, historic metrics, and more.

Met powers the charts and runtime details shown in the Oban Web dashboard.

Features

  • 🤖 Autonomous - Supervises a collection of autonomous modules that dynamically start and stop alongside Oban instances without any code changes.

  • 🎩 Distributed - Metrics are shared between all connected nodes via pubsub. Leadership is used to restrict expensive operations, such as performing counts, to a single node.

  • 📼 Recorded - Telemetry events and scraped data are stored in-memory as time series data. Values are stored as either gauges or space efficient "sketches".

  • 🪐 Multidimensional - Metrics are stored with labels such as node, queue, worker, etc. that can be filtered and grouped dynamically at runtime.

  • 🗜️ Compacting - Time series values are periodically compacted into larger windows of time to save space and optimize querying historic data. Compaction periods use safe defaults, but are configurable.

  • ✏️ Estimating - In supporting systems (Postgres), count queries use optimized estimates automatically for tables with a large number of jobs.

  • 🔎 Queryable - Historic metrics may be filtered and grouped by any label, sliced by arbitrary time intervals, and numeric values aggregated at dynamic percentiles (e.g. P50, P99) without pre-computed histogram buckets.

  • 🤝 Handoff - Ephemeral data storage via data replication with handoff between nodes. All nodes have a shared view of the cluster's data and new nodes are caught up when they come online.

Installation

Oban Met is included with Oban Web and manual installation is only necessary in hybrid environments (separate Web and Worker nodes).

To receive metrics from non-web nodes in a system with separate "web" and "worker" applications you must explicitly include oban_met as a dependency for "workers".

{:oban_met, "~> 1.0"}

Usage

No configuration is necessary and Oban Met will start automatically in a typical application. A variety of options are provided for more complex or nuanced usage.

Auto Start

Supervised Met instances start automatically along with Oban instances unless Oban is in testing mode. You can disable auto-starting globally with application configuration:

config :oban_met, auto_start: false

Then, start instances as a child directly within your Oban app's plugins:

plugins: [
  Oban.Met,
  ...
]

Customizing Estimates

Options for internal Oban.Met processes can be overridden from the plugin specification. Most options are internal and not meant to be overridden, but one particularly useful option to tune is the estimate_limit. The estimate_limit determines at which point state/queue counts switch from using an accurate count(*) call to a much more efficient, but less accurate, estiamte function.

The default limit is a conservative 50k, which may be too low for systems with insert spikes. This declares an override to set the limit to 200k:

{Oban.Met, reporter: [estimate_limit: 200_000]}

Explicit Migrations

Met will create the necessary estimate function automatically when possible. The migration isn't necessary under normal circumstances, but is provided to avoid permission issues or allow full control over database changes.

mix ecto.gen.migration add_oban_met

Open the generated migration and delegate the up/0 and down/0 functions to Oban.Met.Migration:

defmodule MyApp.Repo.Migrations.AddObanMet do
  use Ecto.Migration

  def up, do: Oban.Met.Migration.up()
  def down, do: Oban.Met.Migration.down()
end

Then, after disabling auto-start, configure the reporter not to auto-migrate if you run the explicit migration:

{Oban.Met, reporter: [auto_migrate: false]}

Summary

Functions

Retrieve stored producer checks.

Get a normalized, unified crontab from all connected nodes.

Get all stored, unique values for a particular label.

Get the latest values for a gauge series, optionally subdivided by a label.

Get all stored values for a series without any filtering.

List all recorded series along with their labels and value type.

Start a Met supervisor for an Oban instance.

Summarize a series of data with an aggregate over a configurable window of time.

Types

counts()

@type counts() :: %{optional(String.t()) => non_neg_integer()}

filter_value()

@type filter_value() :: label() | [label()]

label()

@type label() :: String.t()

latest_opts()

@type latest_opts() :: [
  filters: keyword(filter_value()),
  group: nil | label(),
  lookback: pos_integer()
]

operation()

@type operation() :: :max | :sum | {:pct, float()}

series()

@type series() :: atom() | String.t()

series_detail()

@type series_detail() :: %{series: series(), labels: [label()], value: module()}

sub_counts()

@type sub_counts() :: %{optional(String.t()) => non_neg_integer() | counts()}

timeslice_opts()

@type timeslice_opts() :: [
  by: pos_integer(),
  filters: keyword(filter_value()),
  group: nil | label(),
  label: nil | label(),
  lookback: pos_integer(),
  operation: operation(),
  since: pos_integer()
]

ts()

@type ts() :: integer()

value()

@type value() :: Oban.Met.Value.t()

Functions

checks(oban \\ Oban)

@spec checks(Oban.name()) :: [map()]

Retrieve stored producer checks.

This mimics the output of the legacy Oban.Web.Plugins.Stats.all_gossip/1 function.

Checks are queried approximately every second and broadcast to all connected nodes, so each node is a replica of checks from the entire cluster. Checks are stored for 30 seconds before being purged.

Output

Checks are the result of Oban.check_queue/1, and the exact contents depends on which Oban.Engine is in use. A Basic engine check will look similar to this:

%{
  uuid: "2dde4c0f-53b8-4f59-9a16-a9487454292d",
  limit: 10,
  node: "me@local",
  paused: false,
  queue: "default",
  running: [100, 102],
  started_at: ~D[2020-10-07 15:31:00],
  updated_at: ~D[2020-10-07 15:31:00]
}

Examples

Get all current checks:

Oban.Met.checks()

Get current checks for a non-standard Oban isntance:

Oban.Met.checks(MyOban)

crontab(oban \\ Oban)

@spec crontab(Oban.name()) :: [{binary(), binary(), map()}]

Get a normalized, unified crontab from all connected nodes.

Examples

Get a merged crontab:

Oban.Met.crontab()
[
  {"* * * * *", "Worker.A", []},
  {"* * * * *", "Worker.B", [["args", %{"mode" => "foo"}]]}
]

Get the crontab for a non-standard Oban instance:

Oban.Met.crontab(MyOban)

labels(oban \\ Oban, label, opts \\ [])

@spec labels(Oban.name(), label(), keyword()) :: [label()]

Get all stored, unique values for a particular label.

Examples

Get all known queues:

Oban.Met.labels("queue")
~w(alpha gamma delta)

Get all known workers:

Oban.Met.labels("worker")
~w(MyApp.Worker MyApp.OtherWorker)

latest(oban \\ Oban, series, opts \\ [])

Get the latest values for a gauge series, optionally subdivided by a label.

Unlike queues and workers, states are static and constant, so they'll always show up in the counts or subdivision maps.

Gauge Series

Latest counts only apply to Gauge series. There are two gauges available (as reported by series/1:

  • :exec_count — jobs executing at that moment, including node, queue, state, and worker labels.

  • :full_count — jobs in the database, including queue, and state labels.

Examples

Get the :full_count value without any grouping:

Oban.Met.latest(:full_count)
%{"all" => 99}

Group the :full_count value by state:

Oban.Met.latest(:full_count, group: "state")
%{"available" => 9, "completed" => 80, "executing" => 5, ...

Group results by queue:

Oban.Met.latest(:exec_count, group: "queue")
%{"alpha" => 9, "gamma" => 3}

Group results by node:

Oban.Met.latest(:exec_count, group: "node")
%{"worker.1" => 6, "worker.2" => 5}

Filter values by node:

Oban.Met.latest(:exec_count, filters: [node: "worker.1"])
%{"all" => 6}

Filter values by queue and state:

Oban.Met.latest(:exec_count, filters: [node: "worker.1", "worker.2"])

lookup(oban \\ Oban, series)

@spec lookup(Oban.name(), series()) :: [term()]

Get all stored values for a series without any filtering.

series(oban \\ Oban)

@spec series(Oban.name()) :: [series_detail()]

List all recorded series along with their labels and value type.

Examples

Oban.Met.series()
[
  %{series: "exec_time", labels: ["state", "queue", "worker"], value: Sketch},
  %{series: "wait_time", labels: ["state", "queue", "worker"], value: Sketch},
  %{series: "exec_count", labels: ["state", "queue", "worker"], value: Gauge},
  %{series: "full_count", labels: ["state", "queue"], value: Gauge}
]

start_link(opts)

@spec start_link(Keyword.t()) :: Supervisor.on_start()

Start a Met supervisor for an Oban instance.

Oban.Met typically starts supervisors automatically when Oban instances initialize. However, starting a supervisor manually can be used if auto_start is disabled.

Options

These options are required; without them the supervisor won't start:

  • :conf — configuration for a running Oban instance, required

  • :name — an optional name for the supervisor, defaults to Oban.Met

Example

Start a supervisor for the default Oban instance:

Oban.Met.start_link(conf: Oban.config())

Start a supervisor with a custom name:

Oban.Met.start_link(conf: Oban.config(), name: MyApp.MetSup)

timeslice(oban \\ Oban, series, opts \\ [])

@spec timeslice(Oban.name(), series(), timeslice_opts()) :: [{ts(), value(), label()}]

Summarize a series of data with an aggregate over a configurable window of time.

Examples

Retreive a 3 second timeslice of the exec_time sketch:

Oban.Met.timeslice(Oban, :exec_time, lookback: 3)
[
  {2, 16771374649.128689, nil},
  {1, 24040058779.3428, nil},
  {0, 22191534459.516357, nil},
]

Group exec_time slices by the queue label:

Oban.Met.timeslice(Oban, :exec_time, group: "queue")
[
  {1, 9970235387.031698, "analysis"},
  {0, 11700429279.446463, "analysis"},
  {1, 23097311376.231316, "default"},
  {0, 23097311376.231316, "default"},
  {1, 1520977874.3348415, "events"},
  {0, 2558504265.2738624, "events"},
  ...