View Source Electric.Client (Electric Client v0.2.5)
An Elixir client for ElectricSQL.
Electric is a sync engine that allows you to sync little subsets of data from Postgres into local apps and services. This client allows you to sync data from Electric into Elixir applications.
Quickstart
Start and connect the Electric sync service
Follow the Installation guide to get Electric up-and-running and connected to a Postgres database.
Create a table
Create a foo
table in your Postgres schema, as per the Electric
Quickstart guide,
so that we have a table to sync.
CREATE TABLE foo (
id SERIAL PRIMARY KEY,
name VARCHAR(255),
value FLOAT
);
Install the Electric Client and receive sync events
Create a simple script that will subscribe to events from a foo
table
in your Postgres database,
# electric.ex
Mix.install([
:electric_client
])
{:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")
# You can create a stream from a table name or a Shape defined using
# `ShapeDefinition.new/2`
stream = Electric.Client.stream(client, "foo")
for msg <- stream do
IO.inspect(msg, pretty: true, syntax_colors: IO.ANSI.syntax_colors())
end
Then run it:
elixir electric.ex
In a separate terminal window, connect to the Postgres database:
psql "postgresql://postgres:password@localhost:54321/electric"
Now any modifications you make to the data in the foo
table will appear
as messages in the elixir process.
INSERT INTO foo (name, value) VALUES
('josevalim', 4545),
('eksperimental', 966),
('lexmag', 677),
('whatyouhide', 598),
('ericmj', 583),
('alco', 377);
UPDATE foo SET value = value + 1;
Filtering Using WHERE clauses
You can subscribe to subsets of the data in your table using
where
clauses.
{:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")
{:ok, shape} = Electric.Client.ShapeDefinition.new("foo", where: "name ILIKE 'a%'")
stream = Electric.Client.stream(client, shape)
for msg <- stream do
# you will now only receive events for database rows matching the `where` clause
end
Configuration
See new/1
for configuration options of the client itself,
and stream/3
for details on configuring the stream itself.
Ecto Integration
If you have Ecto installed then you can define your Shapes using Ecto queries:
# ecto.ex
Mix.install([
:ecto_sql,
:electric_client
])
import Ecto.Query, only: [from: 2]
import Ecto.Query.API, only: [ilike: 2]
defmodule Foo do
use Ecto.Schema
schema "foo" do
field :name, :string
field :value, :float
end
end
{:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")
# Replace the table or `ShapeDefinition` with an `Ecto` query and set
# `replica` to `:full` to receive full rows for update messages.
#
# The normal `replica: :default` setting will only send the changed
# columns, so we'd end up with partial `%Foo{}` instances.
stream =
Electric.Client.stream(
client,
from(f in Foo, where: ilike(f.name, "a%")),
replica: :full
)
for %{headers: %{operation: operation}, value: value} <- stream do
# The message `value` will now be a `%Foo{}` struct
IO.inspect([{operation, value}], pretty: true, syntax_colors: IO.ANSI.syntax_colors())
end
Custom Values
Electric sends all column values as binaries. The Ecto
integration uses
Ecto
's schema information to turn those into the relevant Elixir terms but
we can provide our own binary() => term()
mapping by implementing the
Electric.Client.ValueMapper
behaviour.
Summary
Functions
Authenticate the given request using the authenticator
configured in the Client
.
Get authentication query parameters for the given Elixir.Electric.Client.ShapeDefinition
.
Use the given client
to delete the shape
instance from the server.
Create a new client.
Create a new client with the given options or raise if the configuration is invalid.
A shortcut to ShapeDefinition.new/2
.
Create a ShapeDefinition
from an Ecto
query.
A shortcut to ShapeDefinition.new!/2
.
Use the client
to return a stream of update messages for the given shape
.
Return an authenticated URL for the given request attributes.
Types
@type client_options() :: [client_option()]
@type column() :: %{ :type => String.t(), optional(:pk_index) => non_neg_integer(), optional(:not_null) => boolean(), optional(:max_length) => non_neg_integer(), optional(:length) => non_neg_integer() }
@type cursor() :: integer()
@type message() :: Electric.Client.Message.ControlMessage.t() | Electric.Client.Message.ChangeMessage.t() | Electric.Client.Message.ResumeMessage.t()
@type replica() :: :default | :full
@type shape() :: table_name() | Electric.Client.ShapeDefinition.t() | Ecto.Queryable.t()
@type shape_handle() :: String.t()
@type stream_option() :: {:parser, nil | {module(), [term()]}} | {:live, boolean()} | {:replica, :default | :full} | {:oneshot, boolean()} | {:resume, Electric.Client.Message.ResumeMessage.t() | nil}
@type stream_options() :: [stream_option()]
@type table_name() :: String.t()
Functions
@spec authenticate_request(t(), Electric.Client.Fetch.Request.t()) :: Electric.Client.Fetch.Request.authenticated()
Authenticate the given request using the authenticator
configured in the Client
.
@spec authenticate_shape(t(), Electric.Client.ShapeDefinition.t()) :: Electric.Client.Authenticator.headers()
Get authentication query parameters for the given Elixir.Electric.Client.ShapeDefinition
.
Use the given client
to delete the shape
instance from the server.
Delete shape only works if Electric is configured to allow_shape_deletion
.
@spec new(client_options()) :: {:ok, t()} | {:error, term()}
Create a new client.
Options
:base_url
- The URL of the electric server, not including the path. E.g. for local development this would behttp://localhost:3000
.:endpoint
- The full URL of the shape API endpoint. E.g. for local development this would behttp://localhost:3000/v1/shape
. Use this if you need a non-standard API path.:database_id
- Which database to use, optional unless Electric is used with multiple databases.
:base_url
vs. :endpoint
If you configure your client using e.g. base_url: "http://localhost:3000"
Electric will append the default shape API path
"/v1/shape"
to create the final endpoint configuration,
in this case "http://localhost:3000/v1/shape"
.
If you wish to use a non-standard endpoint path because, for example, you wrap your Shape API calls in an authentication proxy, then configure the endpoint directly:
Client.new(endpoint: "https://my-server.my-domain.com/electric/shape/proxy")
@spec new!(client_options()) :: t() | no_return()
Create a new client with the given options or raise if the configuration is invalid.
@spec shape(String.t(), Electric.Client.ShapeDefinition.options()) :: {:ok, Electric.Client.ShapeDefinition.t()} | {:error, term()}
A shortcut to ShapeDefinition.new/2
.
iex> Elixir.Client.shape("my_table")
{:ok, %Electric.Client.ShapeDefinition{table: "my_table"}}
@spec shape!(Ecto.Queryable.t()) :: Electric.Client.ShapeDefinition.t() | no_return()
Create a ShapeDefinition
from an Ecto
query.
Accepts any implementation of Ecto.Queryable
(e.g. an %Ecto.Query{}
struct or
Ecto.Schema
module) to generate a ShapeDefinition
.
iex> query = from(t in MyApp.Todo, where: t.completed == false)
iex> Elixir.Client.shape!(query)
%Electric.Client.ShapeDefinition{table: "todos" where: "(\"completed\" = FALSE)"}
Values from the Electric change stream will be mapped to instances of the
passed Ecto.Schema
module.
@spec shape!(String.t(), Electric.Client.ShapeDefinition.options()) :: Electric.Client.ShapeDefinition.t() | no_return()
A shortcut to ShapeDefinition.new!/2
.
@spec stream(t(), shape(), stream_options()) :: Enumerable.t(message())
Use the client
to return a stream of update messages for the given shape
.
shape
can be a table name, e.g. "my_table"
, a full ShapeDefinition
including a table name and where
clause, or (if Ecto
is installed) an
Ecto.Queryable
instance, such as an Ecto.Query
or a Ecto.Schema
struct.
Options
:parser
- A{module, args}
tuple specifying theElectric.Client.ValueMapper
implementation to use for mapping values from the sync stream into Elixir terms. The default value isnil
.:live
(boolean/0
) - Iftrue
(the default) reads an infinite stream of update messages from the server. The default value istrue
.:replica
- Instructs the server to send just the changed columns for an update (:modified
) or the full row (:full
). The default value is:default
.:oneshot
(boolean/0
) - Only make a single request and then terminate the stream. The default value isfalse
.:resume
- Resume the stream from the given point.Message.ResumeMessage
messages are appended to the change stream if you terminate it early usinglive: false
oroneshot: true
@spec url(t(), Electric.Client.Fetch.Request.attrs()) :: binary()
Return an authenticated URL for the given request attributes.