View Source Instream (Instream v2.2.1)

InfluxDB driver for Elixir

connections

Connections

To connect to an InfluxDB server you need a connection module:

defmodule MyConnection do
  use Instream.Connection, otp_app: :my_app
end

The :otp_app name and the name of the module can be freely chosen but have to be linked to a corresponding configuration entry. This defined connection module needs to be hooked up into your supervision tree:

children = [
  # ...
  MyConnection,
  # ...
]

Example of the matching configuration entry:

# InfluxDB v2.x
config :my_app, MyConnection,
  auth: [method: :token, token: "my_token"],
  bucket: "my_default_bucket",
  org: "my_default_org",
  host: "my.influxdb.host",
  version: :v2

# InfluxDB v1.x
config :my_app, MyConnection,
  auth: [username: "my_username", password: "my_password"],
  database: "my_default_database",
  host: "my.influxdb.host"

More details on connections and configuration options can be found with the modules Instream.Connection and Instream.Connection.Config.

queries

Queries

To read data from your InfluxDB server you should send a query:

# Flux query
MyConnection.query(~s[
  from(bucket: "#{MyConnection.config(:bucket)}")
  |> range(start: -5m)
  |> filter(fn: (r) =>
    r._measurement == "instream_examples"
  )
])

# InfluxQL query
MyConnection.query("SELECT * FROM instream_examples")

Most of the queries you send require a :database or :bucket/:organization to operate on.

These values will be taken from your connection configuration by default. By using the option argument of MyConnection.query() you can pass different values to use on a per-query basis:

MyConnection.query(
  "... query ...",
  bucket: "my_other_bucket",
  org: "my_other_organization"
)

MyConnection.query("... query ...", database: "my_other_database")

Responses from a query will be decoded into maps by default.

Depending on your InfluxDB version you can use the :result_as option argument to skip the decoding or request a non-default response type:

  • result_as: :csv: response as CSV when using InfluxDB v1
  • result_as: :raw: result as sent from the server without decoding

query-language-selection

Query Language Selection

Depending on your configured InfluxDB version all queries will be treated as :flux (v2) or :influxql by default. You can send a query in the non-default language by passing the :query_language option:

MyConnection.query("... query ...", query_language: :flux)
MyConnection.query("... query ...", query_language: :influxql)

query-parameter-binding-influxdb-v1-x

Query Parameter Binding (InfluxDB v1.x)

Queries can be parameterized, for example when you are dealing with untrusted user input:

MyConnection.query(
  "SELECT * FROM some_measurement WHERE field = $field_param",
  params: %{field_param: "some_value"}
)

post-queries-influxdb-v1-x

POST Queries (InfluxDB v1.x)

Some queries require you to switch from the regular read only context (all GET requests) to a write context (all POST requests).

When not using the query build you have to pass that information manually to query/2:

MyConnection.query(
  "CREATE DATABASE create_in_write_mode",
  method: :post
)

query-timeout-configuration

Query Timeout Configuration

If you find your queries running into timeouts (e.g. :hackney not waiting long enough for a response) you can pass an option to the query call:

MyConnection.query(query, http_opts: [recv_timeout: 250])

This value can also be set as a default using your HTTP client configuration. A passed configuration will take precedence over the connection configuration.

writing-points

Writing Points

Writing data to your InfluxDB server is done using either Instream.Series modules or raw maps.

Depending on your connection configuration the selected writer module provides additional options.

The write function can be used with a single or multiple data points:

MyConnection.write(point)
MyConnection.write([point_1, point_2])

writing-points-using-series

Writing Points using Series

Each series in your database can be represented using a definition module:

defmodule MySeries do
  use Instream.Series

  series do
    measurement "my_measurement"

    tag :bar
    tag :foo

    field :value
  end
end

This module will provide you with a struct you can use to define points you want to write to your database:

MyConnection.write(%MySeries{
  fields: %MySeries.Fields{value: 17},
  tags: %MySeries.Tags{bar: "bar", foo: "foo"}
})

More information about series definitions can be found in the module documentation of Instream.Series.

writing-points-using-plain-maps

Writing Points using Plain Maps

As an alternative you can use a non-struct map to write points to a database:

MyConnection.write(
  %{
    measurement: "my_measurement",
    fields: %{answer: 42, value: 1},
    tags: %{foo: "bar"},
    timestamp: 1_439_587_926_000_000_000
  },
  # more points possible ...
)

The field :timestamp is optional. InfluxDB will use the receive time of the write request if it is missing.

deleting-data-influxdb-v2-x

Deleting Data (InfluxDB v2.x)

If you are connecting to an InfluxDB v2.x instance you can use the deletion API:

MyConnection.delete(%{
  start: DateTime.to_iso(~U[2020-01-01T00:00:00Z]),
  stop: DateTime.to_iso(~U[2020-01-02T00:00:00Z]),
  predicate: ~S(_measurement="optional" AND foo="bar")
})