View Source Intro to Timescale
Mix.install([
{:timescale, "~> 0.1.0"},
{:kino_db, "~> 0.2.0"},
{:jason, "~> 1.4"},
{:postgrex, "~> 0.16.5"},
{:kino_ecto, git: "https://github.com/vorce/kino_ecto"}
])
# Add https://github.com/vorce/kino_ecto
building-a-health-tracker
Building a health tracker
Let's learn about using the timescale
Elixir library for TimescaleDB.
For our example, let's imagine we're building a fitness application that tracks your heart rate through a wearable device. Our application will receive (literal) heartbeats from the wearable device at varying intervals that we will record into the database.
setting-up-the-repo
Setting up the Repo
First and foremost, please ensure you have TimescaleDB installed on your machine using the installation guide. To go through these examples, you'll need the Timescale Toolkit installed on your machine as well. The best way to install is with Docker, or if you're brave, build from source.
Once Postgres/Timescale is up in running, you'll also need to set up secrets for connecting to a database. This will also require creating a Postgres database. Try the following commands in your shell:
$ psql -c 'create database timescale_fitness'
You'll also need to set the following secrets in Livebook
POSTGRES_HOST
- postgres hostname to connect to (defaultlocalhost
)POSTGRES_USER
- postgres username to connect with (defaultpostgres
)POSTGRES_PASS
- postgres password to connect with (defaultpostgres
)POSTGRES_DATABASE
- database name (defaulttimescale_fitness
)
defmodule Fitness.Repo do
use Ecto.Repo,
otp_app: :fitness,
adapter: Ecto.Adapters.Postgres
def init(_type, config) do
{:ok,
Keyword.merge(config,
migration_lock: :pg_advisory_lock,
hostname: System.get_env("LB_POSTGRES_HOST", "localhost"),
port: 5432,
username: System.get_env("LB_POSTGRES_USER", "postgres"),
password: System.get_env("LB_POSTGRES_PASSWORD", "postgres"),
# If using Fly, enable IPv6 below
# socket_options: [:inet6],
database: System.get_env("LB_POSTGRES_DATABASE", "timescale_fitness")
)}
end
# Helper function for LiveBook demonstrations.
def migrate({num, migration}, direction) when is_atom(migration) do
{:ok, _, _} =
Ecto.Migrator.with_repo(__MODULE__, fn repo ->
Ecto.Migrator.run(repo, [{num, migration}], direction, all: true)
end)
"Successfully Migrated #{inspect(migration)}"
end
end
alias Fitness.Repo
import Ecto.Query, except: [first: 2, last: 2]
{:ok, repo_pid} = Kino.start_child(Repo)
opts = [
hostname: "localhost",
port: 5432,
username: "postgres",
password: "postgres",
database: "timescale_fitness"
]
{:ok, conn} = Kino.start_child({Postgrex, opts})
creating-our-hypertable-migration
Creating our hypertable migration
defmodule Fitness.Repo.Migrations.CreateHeartbeat do
use Ecto.Migration
import Timescale.Migration
def up do
create_timescaledb_extension()
create_if_not_exists table(:users) do
add(:fullname, :string)
end
create_if_not_exists(unique_index(:users, [:fullname]))
create_if_not_exists table(:heartbeats, primary_key: false) do
add(:timestamp, :naive_datetime_usec, null: false)
add(:user_id, references(:users), null: false)
end
create_hypertable(:heartbeats, :timestamp)
end
def down do
drop(table("heartbeats"), mode: :cascade)
drop(table("users"), mode: :cascade)
drop_timescaledb_extension()
end
end
Repo.migrate({0, Fitness.Repo.Migrations.CreateHeartbeat}, :up)
generating-mock-data
Generating mock data
To facilitate our example, let's create three users who are tracking their heartbeats.
users = [
%{fullname: "Dave Lucia"},
%{fullname: "Alex Koutmos"},
%{fullname: "Peter Ullrich"}
]
Repo.insert_all("users", users, on_conflict: :nothing)
query =
from(u in "users", order_by: [desc: u.fullname], select: %{id: u.id, fullname: u.fullname})
[%{id: peter_id} = peter, %{id: dave_id} = dave, %{id: alex_id} = alex] = Repo.all(query)
Next, we've built a little module to help us simulate heartbeats for an entire day.
defmodule Fitness.Generator do
@ms_in_day :timer.hours(24)
@doc """
Given a date, will generate a list of heartbeats for the day,
represented as a list of maps with a `timestamp` field
"""
@spec generate_heartbeats(%{id: integer()}, Date.t()) :: list(%{timestamp: NaiveDateTime.t()})
def generate_heartbeats(user, day) do
do_generate_heartbeats(user, [], NaiveDateTime.new!(day, ~T[00:00:00.000]), 0)
end
defp do_generate_heartbeats(user, heartbeats, day, ms) do
# keep it between 60-200 beats per minute
next = floor(:timer.minutes(1) / Enum.random(60..200)) + ms
if next < @ms_in_day do
heartbeat = %{timestamp: NaiveDateTime.add(day, next, :millisecond), user_id: user.id}
do_generate_heartbeats(user, [heartbeat | heartbeats], day, next)
else
Enum.reverse(heartbeats)
end
end
end
# TODO try method in https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit
batch_insert = fn heartbeats ->
heartbeats
|> Enum.chunk_every(100)
|> Enum.map(fn chunk ->
Repo.insert_all("heartbeats", chunk)
end)
end
generate-small-dataset
Generate small dataset
Next, we generate heartbeats for one user, and batch insert them into the database.
batch_insert.(Fitness.Generator.generate_heartbeats(alex, ~D[2022-09-22]))
querying-with-timescale
Querying with Timescale
Now that we have a dataset generated, let's try some queries using Timescale hyperfunctions. Let's start with the basics, and try to get the first and last values in our timeseries, using the first and last hyperfunctions.
import Timescale.Hyperfunctions
Repo.all(
from(h in "heartbeats",
where: h.user_id == ^alex_id,
select: {first(h.timestamp, h.timestamp), last(h.timestamp, h.timestamp)}
)
)
Ok, so not so interesting, but we can validate that our data starts roughly at the beginning of today and ends towards midnight.
For more of a challenge, let's use the time_bucket hyperfunction to calculate average BPM for each hour.
First, let's explore how the time_bucket
hyperfunction works.
Repo.all(
from(h in "heartbeats",
where: h.user_id == ^alex_id,
select: {h.timestamp, time_bucket(h.timestamp, "1 second")},
limit: 5
)
)
The first item in each tuple is the actual timestamp of the heartbeat, down to the microsecond. The second item is the result of time_bucket/2
on the timestamp, bucketed down to the nearest second. time_bucket/2
acts like the floor/1 math function, and as we'll see in a moment, enables further aggregation over time-series.
Repo.all(
from(h in "heartbeats",
where: h.user_id == ^alex_id,
group_by: selected_as(:minute),
select: %{
minute: selected_as(time_bucket(h.timestamp, "1 minute"), :minute),
bpm: count(h)
},
limit: 5
)
)
performance-with-large-datasets
Performance with large datasets
So far, we've only taken a look at a relatively small dataset.
Repo.one(from(h in "heartbeats", select: count(h)))
Let's make this way more interesting by generating a months worth of heartbeats for our second user. As shown above, that should put us around 5 million rows of data. Go grab a soda, as this is gonna take a little while to execute.
for date <- Date.range(~D[2022-10-01], ~D[2022-10-31]) do
batch_insert.(Fitness.Generator.generate_heartbeats(dave, date))
end
result3 = Postgrex.query!(conn, "select * from approximate_row_count('heartbeats');", [])
Now that we have a large dataset, let's retry our previous queries and see how they perform.
result4 =
Postgrex.query!(
conn,
"explain analyze select first(h.timestamp, h.timestamp), last(h.timestamp, h.timestamp) from heartbeats as h where user_id = 1;",
[]
)
Wow! They're still lightning fast. Timescale changes the query plan to make use of the hypertable architecture. Each chunk of our hypertable is aware of the time column, and can easily eliminate rows in bulk from our query plan.
compression
Compression
One of the most compelling features of TimescaleDB is its ability to compress data. While data storage may be relatively cheap, the rate at which time-series data grows changes the equation a bit!
result5 =
Postgrex.query!(
conn,
"select pg_size_pretty(hypertable_size('heartbeats')) as heartbeats_size",
[]
)
defmodule Fitness.Repo.Migrations.CompressHeartbeats do
use Ecto.Migration
import Timescale.Migration
def up do
enable_hypertable_compression(:heartbeats, segment_by: :user_id)
add_compression_policy(:heartbeats, "7 days")
end
def down do
execute("SELECT decompress_chunk(i, true) from show_chunks('heartbeats') i;")
flush()
remove_compression_policy(:test_hypertable, if_exists: true)
disable_hypertable_compression(:heartbeats)
end
end
Repo.migrate({1, Fitness.Repo.Migrations.CompressHeartbeats}, :up)
Now that we've compressed the table, we should observe that its size as been dramatically reduced. For some datasets, its common to see compression ratios of up to 96%.
result6 =
Postgrex.query!(
conn,
"select pg_size_pretty(hypertable_size('heartbeats')) as heartbeats_size",
[]
)
continuous-aggregates
Continuous Aggregates
As should be clear by now, aggregating data over time is extremely common when working with time-series data. Often, you'll want to compute aggregations across your dataset, grouped by various attributes such as your users, location, and of course, time.
TimescaleDB has a feature called Continuous Aggregates that dramatically improves the performance of expensive aggregations. It accomplishes this with some clever usage of materialized views, triggers, and hypertables, to produce views on data that update in real-time and query instaneously.
Let's build a continuous aggregate for calcuating the min and max daily BPM for each user.
result7 =
Postgrex.query!(
conn,
"""
CREATE MATERIALIZED VIEW heartbeats_minutely
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 minute', h.timestamp) as minute, count(h) as bpm, user_id
FROM heartbeats as h
GROUP BY minute, user_id
""",
[]
)
result8 =
Postgrex.query!(
conn,
"SELECT add_retention_policy('heartbeats', INTERVAL '1 year');",
[]
)
filling-gaps-in-time-series
Filling gaps in time-series
Warning: this section requires the TimescaleDB Toolkit to be installed, which may require building from source.
defmodule Fitness.Repo.Migrations.AddTimescaleToolkit do
use Ecto.Migration
import Timescale.Migration
def up do
create_timescaledb_toolkit_extension()
end
def down do
drop_timescaledb_toolkit_extension()
end
end
Repo.migrate({3, Fitness.Repo.Migrations.AddTimescaleToolkit}, :up)
With any system, there are opportunities for the system to go down. This could be due to a datacenter outage, global DNS being attacked, or simply an application configuration mishap. Regardless, it is possible for gaps in the collection of our data, which poses problems for calculations and aggregations.
Luckily, TimescaleDB provides hyperfunctions for gapfilling and interpoltion so smooth over these gaps.
Let's add some heartbeats in for another user every second, skipping over a few.
Repo.insert_all("heartbeats", [
%{timestamp: ~N[2022-11-01 09:00:00], user_id: peter_id},
%{timestamp: ~N[2022-11-01 09:00:01], user_id: peter_id},
# No heartbeat at 9:00:02
%{timestamp: ~N[2022-11-01 09:00:03], user_id: peter_id},
# No heartbeat at 9:00:04
%{timestamp: ~N[2022-11-01 09:00:05], user_id: peter_id}
])
Now we've got 4 heartbeats, with two missing at seconds 2 and 4. We can use the time_bucket_gapfill/2
function to fill in heartbeats.
Repo.all(
from(h in "heartbeats",
where: h.user_id == ^peter_id,
select: %{
second: selected_as(time_bucket_gapfill(h.timestamp, "1 second"), :second),
user_id: h.user_id
},
where:
h.timestamp >= ^~N[2022-11-01 09:00:00] and
h.timestamp <= ^~N[2022-11-01 09:00:05],
group_by: [selected_as(:second), h.user_id]
)
)