InfluxEx.Flux (influx_ex v0.3.1)
Functions to build Flux queries
Current time related functionality only supports flux's duration types, for
this is a value such has 15m
. Where 15
is the number of the unit that
follows, which in this case is m
meaning minutes.
For more information about duration types see the Flux docs.
The %InfluxEx.Flux{}
struct implements the String.Chars
protocol which
allows to_string/1
to be called on the structure.
Link to this section Summary
Functions
Add an aggregate window
Set which field the query is for
Fill missing values with the previous one
Set the bucket for the query
Set the measurement name to query for
Set the time range for the query
Run the flux query
Set a tag to filter against
Link to this section Types
aggregate_selector()
@type aggregate_selector() :: :mean
aggregate_window()
@type aggregate_window() :: %{ every: binary(), create_empty: false, fn: aggregate_selector() }
aggregate_window_opt()
@type aggregate_window_opt() :: {:create_empty, boolean()}
from_opt()
@type from_opt() :: {:imports, binary()}
Options to pass to the from/2
function
:imports
- a list of Flux libraries to import
@type t() :: %InfluxEx.Flux{ aggregate_window: aggregate_window() | nil, bucket: InfluxEx.Bucket.name(), end: binary() | pos_integer() | nil, field: binary() | nil, fill_value_use_previous: boolean(), imports: [binary()], measurement: binary() | nil, start: binary() | pos_integer() | nil, tags: map() }
Data structure for a flux query
Link to this section Functions
aggregate_window(f, every, opts \\ [])
@spec aggregate_window(t(), binary(), [aggregate_window_opt()]) :: t()
Add an aggregate window
field(f, field)
Set which field the query is for
"my_bucket"
|> InfluxEx.Flux.from()
|> InfluxEx.Flux.range("-15m")
|> InfluxEx.Flux.measurement("cpu")
|> InfluxEx.Flux.field("average")
fill_value_previous(f)
Fill missing values with the previous one
This is useful for setting the :create_empty
field to true
when using
Flux.aggregate_window/3
.
"my bucket"
|> InfluxEx.Flux.from()
|> InfluxEx.Flux.range("-1d")
|> InfluxEx.Flux.measurement("cpu")
|> InfluxEx.Flux.field("average")
|> InfluxEx.Flux.aggregate_window("1h", create_empty: true)
|> InfluxEx.Flux.fill_value_previous()
The above query will return tables with data over the last day averaged in one hour intervals. Where there's missing data points, the query will fill the value with the last data point's value.
from(bucket, opts \\ [])
@spec from(InfluxEx.Bucket.name(), [from_opt()]) :: t()
Set the bucket for the query
This function starts a new flux query.
InfluxEx.Flux.from("my bucket")
measurement(f, measurement)
Set the measurement name to query for
"my_bucket"
|> InfluxEx.Flux.from()
|> InfluxEx.Flux.range("-15m")
|> InfluxEx.Flux.measurement("cpu")
range(f, start, end_t \\ nil)
@spec range(t(), binary() | pos_integer(), binary() | pos_integer() | nil) :: t()
Set the time range for the query
To query a bucket over the last 15 minutes, you can use range/3
:
"my_bucket"
|> InfluxEx.Flux.from()
|> InfluxEx.Flux.range("-15m")
To query using an explicit time range with timestamps:
"my_bucket"
|> InfluxEx.Flux.from()
|> InfluxEx.Flux.range(1653678115, 1653678175)
run_query(flux_query, client, opts \\ [])
@spec run_query(t() | binary(), InfluxEx.Client.t(), [InfluxEx.query_opt()]) :: {:ok, InfluxEx.tables()} | {:error, InfluxEx.error()}
Run the flux query
You can either build a Flux query using the InfluxEx.Flux
query builder API
or you can provide a raw flux query.
query = """
from("my-bucket")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "rpm")
|> aggregateWindow(every: 1m, fn: max)
"""
InfluxEx.Flux.run_query(query, my_client)
tag(f, tag_name, tag_value)
Set a tag to filter against