Electric.Client (Electric Client v0.7.3)
View SourceAn Elixir client for ElectricSQL.
Electric is a sync engine that allows you to sync little subsets of data from Postgres into local apps and services. This client allows you to sync data from Electric into Elixir applications.
Quickstart
Start and connect the Electric sync service
Follow the Installation guide to get Electric up-and-running and connected to a Postgres database.
Create a table
Create a foo table in your Postgres schema, as per the Electric
Quickstart guide,
so that we have a table to sync.
CREATE TABLE foo (
id SERIAL PRIMARY KEY,
name VARCHAR(255),
value FLOAT
);Install the Electric Client and receive sync events
Create a simple script that will subscribe to events from a foo table
in your Postgres database,
# electric.ex
Mix.install([
:electric_client
])
{:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")
# You can create a stream from a table name or a Shape defined using
# `ShapeDefinition.new/2`
stream = Electric.Client.stream(client, "foo")
for msg <- stream do
IO.inspect(msg, pretty: true, syntax_colors: IO.ANSI.syntax_colors())
endThen run it:
elixir electric.exIn a separate terminal window, connect to the Postgres database:
psql "postgresql://postgres:password@localhost:54321/electric"Now any modifications you make to the data in the foo table will appear
as messages in the elixir process.
INSERT INTO foo (name, value) VALUES
('josevalim', 4545),
('eksperimental', 966),
('lexmag', 677),
('whatyouhide', 598),
('ericmj', 583),
('alco', 377);
UPDATE foo SET value = value + 1;Filtering Using WHERE clauses
You can subscribe to subsets of the data in your table using
where clauses.
{:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")
{:ok, shape} = Electric.Client.ShapeDefinition.new("foo", where: "name ILIKE 'a%'")
stream = Electric.Client.stream(client, shape)
for msg <- stream do
# you will now only receive events for database rows matching the `where` clause
endConfiguration
See new/1 for configuration options of the client itself,
and stream/3 for details on configuring the stream itself.
Ecto Integration
If you have Ecto installed then you can define your Shapes using Ecto queries:
# ecto.ex
Mix.install([
:ecto_sql,
:electric_client
])
import Ecto.Query, only: [from: 2]
import Ecto.Query.API, only: [ilike: 2]
defmodule Foo do
use Ecto.Schema
schema "foo" do
field :name, :string
field :value, :float
end
end
{:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")
# Replace the table or `ShapeDefinition` with an `Ecto` query and set
# `replica` to `:full` to receive full rows for update messages.
#
# The normal `replica: :default` setting will only send the changed
# columns, so we'd end up with partial `%Foo{}` instances.
stream =
Electric.Client.stream(
client,
from(f in Foo, where: ilike(f.name, "a%")),
replica: :full
)
for %{headers: %{operation: operation}, value: value} <- stream do
# The message `value` will now be a `%Foo{}` struct
IO.inspect([{operation, value}], pretty: true, syntax_colors: IO.ANSI.syntax_colors())
endCustom Values
Electric sends all column values as binaries. The Ecto integration uses
Ecto's schema information to turn those into the relevant Elixir terms but
we can provide our own binary() => term() mapping by implementing the
Electric.Client.ValueMapper behaviour.
Summary
Functions
Authenticate the given request using the authenticator configured in the Client.
Get authentication query parameters for the given Elixir.Electric.Client.ShapeDefinition.
Use the given client to delete the shape instance from the server.
Get a client instance that runs against an embedded instance of electric, that is an electric app running as a dependency of the current application.
Create a new client.
Create a new client with the given options or raise if the configuration is invalid.
A shortcut to ShapeDefinition.new/2.
Create a ShapeDefinition from an Ecto query.
A shortcut to ShapeDefinition.new!/2.
Get a stream of update messages.
Use the client to return a stream of update messages for the given shape.
Return an authenticated URL for the given request attributes.
Types
@type client_options() :: [client_option()]
@type column() :: %{ :type => String.t(), optional(:pk_index) => non_neg_integer(), optional(:not_null) => boolean(), optional(:max_length) => non_neg_integer(), optional(:length) => non_neg_integer() }
@type cursor() :: integer()
@type ecto_shape() :: Ecto.Queryable.t() | Ecto.Changeset.t() | (map() -> Ecto.Changeset.t())
@type message() :: Electric.Client.Message.ControlMessage.t() | Electric.Client.Message.ChangeMessage.t() | Electric.Client.Message.ResumeMessage.t()
@type offset() :: Electric.Client.Offset.t()
@type replica() :: :default | :full
@type shape() :: table_name() | Electric.Client.ShapeDefinition.t() | Ecto.Queryable.t()
@type shape_handle() :: String.t()
@type stream_option() :: {:parser, nil | {module(), [term()]}} | {:live, boolean()} | {:replica, :default | :full} | {:resume, Electric.Client.Message.ResumeMessage.t() | nil} | {:errors, :raise | :stream}
@type stream_options() :: [stream_option()]
@type table_name() :: String.t()
Functions
@spec authenticate_request(t(), Electric.Client.Fetch.Request.t()) :: Electric.Client.Fetch.Request.authenticated()
Authenticate the given request using the authenticator configured in the Client.
@spec authenticate_shape(t(), Electric.Client.ShapeDefinition.t()) :: Electric.Client.Authenticator.headers()
Get authentication query parameters for the given Elixir.Electric.Client.ShapeDefinition.
Use the given client to delete the shape instance from the server.
Delete shape only works if Electric is configured to allow_shape_deletion.
@spec embedded(Electric.Shapes.Api.options()) :: {:ok, t()} | {:error, term()}
Get a client instance that runs against an embedded instance of electric, that is an electric app running as a dependency of the current application.
@spec embedded!(Electric.Shapes.Api.options()) :: t() | no_return()
@spec new(client_options()) :: {:ok, t()} | {:error, term()}
Create a new client.
Options
:base_url- The URL of the electric server, not including the path. E.g. for local development this would behttp://localhost:3000.:endpoint- The full URL of the shape API endpoint. E.g. for local development this would behttp://localhost:3000/v1/shape. Use this if you need a non-standard API path.:params(map ofatom/0keys andterm/0values) - Additional query parameters to include in every request to the Electric backend. The default value is%{}.:parser- A{module, args}tuple specifying theElectric.Client.ValueMapperimplementation to use for mapping values from the sync stream into Elixir terms. The default value is{Electric.Client.ValueMapper, []}.:fetch- A{module, opts}tuple specifying theElectric.Client.Fetchimplementation to use for calling the Electric API.See
Electric.Client.Fetch.HTTPfor the options available when using the defaultHTTPfetcher.Client.new( base_url: "http://localhost:3000", fetch: {Electric.Client.Fetch.HTTP, # never error, just keep retrying if the Electric server is down timeout: :infinity, # add a bearer token to every request (see `authenticator` if you need more # control over authenticating your requests headers: [{"authorize", "Bearer some-token-here"}]} )The default value is
{Electric.Client.Fetch.HTTP, []}.
:base_url vs. :endpoint
If you configure your client using e.g. base_url: "http://localhost:3000"
Electric will append the default shape API path
"/v1/shape" to create the final endpoint configuration,
in this case "http://localhost:3000/v1/shape".
If you wish to use a non-standard endpoint path because, for example, you wrap your Shape API calls in an authentication proxy, then configure the endpoint directly:
Client.new(endpoint: "https://my-server.my-domain.com/electric/shape/proxy")
@spec new!(client_options()) :: t() | no_return()
Create a new client with the given options or raise if the configuration is invalid.
@spec shape(String.t(), Electric.Client.ShapeDefinition.options()) :: {:ok, Electric.Client.ShapeDefinition.t()} | {:error, term()}
A shortcut to ShapeDefinition.new/2.
iex> Elixir.Client.shape("my_table")
{:ok, %Electric.Client.ShapeDefinition{table: "my_table"}}
@spec shape!(ecto_shape()) :: Electric.Client.ShapeDefinition.t() | no_return()
Create a ShapeDefinition from an Ecto query.
Accepts any implementation of Ecto.Queryable (e.g. an %Ecto.Query{} struct or
Ecto.Schema module) to generate a ShapeDefinition.
iex> query = from(t in MyApp.Todo, where: t.completed == false)
iex> Elixir.Client.shape!(query)
%Electric.Client.ShapeDefinition{table: "todos", where: "(\"completed\" = FALSE)"}Also takes an Ecto.Changeset or 1-arity function returning an Ecto.Changeset:
iex> Elixir.Client.shape!(&MyApp.Todo.changeset/1)
%Electric.Client.ShapeDefinition{table: "todos", columns: ["title", "completed"]}Values from the Electric change stream will be mapped to instances of the
passed Ecto.Schema module.
Column subsets
Specifying a subset of columns to stream can be done in two ways:
Passing a changeset will filter the table columns according to the applied validations so if you want a shape to include a column, you must ensure that it has some kind of validation, using e.g.
Ecto.Changeset.validate_required/2Using
Ecto.Query.select/3to select a subset of columns within the query:Electric.Client.shape!( from(t in MyApp.Todo, where: t.completed == false, select: [:id, :title]) )
select/3 allows for various ways to specify the columns,
we only support the following forms:
select(Todo, [t], [:id, :name])select(Todo, [t], [t.id, t.name])select(Todo, [t], {t.id, t.name})select(Todo, [t], struct(t, [:id, :name]))select(Todo, [t], map(t, [:id, :name]))
You definitely can't add virtual columns like this:
select(Todo, [t], map(t, %{t | reason: "here"}))
We also support Ecto.Query.select_merge/3 to add additional columns:
select(Todo, [t], map(t, [:id, :name])) |> select_merge([:completed])
@spec shape!(String.t(), Electric.Client.ShapeDefinition.options()) :: Electric.Client.ShapeDefinition.t() | no_return()
A shortcut to ShapeDefinition.new!/2.
@spec stream(t(), stream_options()) :: Enumerable.t(message())
@spec stream(t(), Electric.Client.ShapeDefinition.t()) :: Enumerable.t(message())
@spec stream(t(), String.t() | ecto_shape()) :: Enumerable.t(message())
@spec stream(String.t(), stream_options()) :: Enumerable.t(message())
Get a stream of update messages.
This accepts a variety of arguments:
Examples:
Using a custom endpoint that returns a pre-defined shape.
If you have used
Electric.Phoenixto mount a pre-defined shape into your Phoenix application, then by creating a client with theendpointset to the URL of this route you can stream data directly from this client:{:ok, client} = Electric.Client.new(endpoint: "http://localhost:4000/shapes/todo") stream = Electric.Client.stream(client)Equivalently you can just pass the URL as the argument to stream:
stream = Electric.Client.stream("http://localhost:4000/shapes/todo")Using a simple table name to define a shape:
{:ok, client} = Electric.Client.new(base_url: "http://localhost:3000") stream = Electric.Client.stream(client, "todos")Using a full shape definition:
{:ok, client} = Electric.Client.new(base_url: "http://localhost:3000") {:ok, shape} = Electric.Client.shape("todos", where: "completed = false", replica: :full) stream = Electric.Client.stream(client, shape)Or with an
Ectoquery orEcto.Schemastruct:{:ok, client} = Electric.Client.new(base_url: "http://localhost:3000") stream = Electric.Client.stream(client, from(t in MyApp.Todos.Todo, where: t.completed == false))
If you want to pass options to your stream, then pass them as the second
argument or use stream/3.
@spec stream(t(), shape(), stream_options()) :: Enumerable.t(message())
Use the client to return a stream of update messages for the given shape.
shape can be a table name, e.g. "my_table", a full ShapeDefinition
including a table name and where clause, or (if Ecto is installed) an
Ecto.Queryable instance, such as an Ecto.Query or a Ecto.Schema struct.
Options
:parser- A{module, args}tuple specifying theElectric.Client.ValueMapperimplementation to use for mapping values from the sync stream into Elixir terms. The default value isnil.:live(boolean/0) - Iftrue(the default) reads an infinite stream of update messages from the server. The default value istrue.:replica- Instructs the server to send just the changed columns for an update (:modified) or the full row (:full). The default value is:default.:resume- Resume the stream from the given point.Message.ResumeMessagemessages are appended to the change stream if you terminate it early usinglive: false:errors- How errors from the Electric server should be handled.:raise(default) - raise an exception if the server returns an error:stream- put the error into the message stream (and terminate)
The default value is
:raise.
@spec url(t(), Electric.Client.Fetch.Request.attrs()) :: binary()
Return an authenticated URL for the given request attributes.