View Source Electric.Client (Electric Client v0.2.1)

An Elixir client for ElectricSQL.

Electric is a sync engine that allows you to sync little subsets of data from Postgres into local apps and services. This client allows you to sync data from Electric into Elixir applications.

Quickstart

Start and connect the Electric sync service

Follow the Installation guide to get Electric up-and-running and connected to a Postgres database.

Create a table

Create a foo table in your Postgres schema, as per the Electric Quickstart guide, so that we have a table to sync.

CREATE TABLE foo (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255),
    value FLOAT
);

Install the Electric Client and receive sync events

Create a simple script that will subscribe to events from a foo table in your Postgres database,

# electric.ex
Mix.install([
  :electric_client
])

{:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")

# You can create a stream from a table name or a Shape defined using
# `ShapeDefinition.new/2`
stream = Electric.Client.stream(client, "foo")

for msg <- stream do
  IO.inspect(msg, pretty: true, syntax_colors: IO.ANSI.syntax_colors())
end

Then run it:

elixir electric.ex

In a separate terminal window, connect to the Postgres database:

psql "postgresql://postgres:password@localhost:54321/electric"

Now any modifications you make to the data in the foo table will appear as messages in the elixir process.

INSERT INTO foo (name, value) VALUES
    ('josevalim', 4545),
    ('eksperimental', 966),
    ('lexmag', 677),
    ('whatyouhide', 598),
    ('ericmj', 583),
    ('alco', 377);

UPDATE foo SET value = value + 1;

Filtering Using WHERE clauses

You can subscribe to subsets of the data in your table using where clauses.

{:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")

{:ok, shape} = Electric.Client.ShapeDefinition.new("foo", where: "name ILIKE 'a%'")

stream = Electric.Client.stream(client, shape)

for msg <- stream do
  # you will now only receive events for database rows matching the `where` clause
end

Configuration

See new/1 for configuration options of the client itself, and stream/3 for details on configuring the stream itself.

Ecto Integration

If you have Ecto installed then you can define your Shapes using Ecto queries:

# ecto.ex
Mix.install([
  :ecto_sql,
  :electric_client
])

import Ecto.Query, only: [from: 2]
import Ecto.Query.API, only: [ilike: 2]

defmodule Foo do
  use Ecto.Schema

  schema "foo" do
    field :name, :string
    field :value, :float
  end
end

{:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")

# Replace the table or `ShapeDefinition` with an `Ecto` query and set
# `replica` to `:full` to receive full rows for update messages.
#
# The normal `replica: :default` setting will only send the changed
# columns, so we'd end up with partial `%Foo{}` instances.
stream =
  Electric.Client.stream(
    client,
    from(f in Foo, where: ilike(f.name, "a%")),
    replica: :full
  )

for %{headers: %{operation: operation}, value: value} <- stream do
  # The message `value` will now be a `%Foo{}` struct 
  IO.inspect([{operation, value}], pretty: true, syntax_colors: IO.ANSI.syntax_colors())
end

Custom Values

Electric sends all column values as binaries. The Ecto integration uses Ecto's schema information to turn those into the relevant Elixir terms but we can provide our own binary() => term() mapping by implementing the Electric.Client.ValueMapper behaviour.

Summary

Functions

Authenticate the given request using the authenticator configured in the Client.

Get authentication query parameters for the given Elixir.Electric.Client.ShapeDefinition.

Use the given client to delete the shape instance from the server.

Create a new client.

Create a new client with the given options or raise if the configuration is invalid.

Create a ShapeDefinition from an Ecto query.

Use the client to return a stream of update messages for the given shape.

Return an authenticated URL for the given request attributes.

Types

@type client_option() ::
  {:base_url, binary()}
  | {:endpoint, binary()}
  | {:database_id, nil | binary()}
  | {:fetch, {module(), [term()]}}
  | {:authenticator, {module(), [term()]}}
@type client_options() :: [client_option()]
@type column() :: %{
  :type => String.t(),
  optional(:pk_index) => non_neg_integer(),
  optional(:not_null) => boolean(),
  optional(:max_length) => non_neg_integer(),
  optional(:length) => non_neg_integer()
}
@type cursor() :: integer()
@type replica() :: :default | :full
@type schema() :: %{required(String.t()) => column()}
@type shape_handle() :: String.t()
@type stream_option() ::
  {:parser, nil | {module(), [term()]}}
  | {:live, boolean()}
  | {:replica, :default | :full}
  | {:oneshot, boolean()}
  | {:resume, Electric.Client.Message.ResumeMessage.t() | nil}
@type stream_options() :: [stream_option()]
@type t() :: %Electric.Client{
  authenticator: term(),
  database_id: term(),
  endpoint: URI.t(),
  fetch: {module(), term()}
}
@type table_name() :: String.t()

Functions

Link to this function

authenticate_request(client, request)

View Source

Authenticate the given request using the authenticator configured in the Client.

Link to this function

authenticate_shape(client, shape)

View Source

Get authentication query parameters for the given Elixir.Electric.Client.ShapeDefinition.

Link to this function

delete_shape(client, shape)

View Source

Use the given client to delete the shape instance from the server.

Delete shape only works if Electric is configured to allow_shape_deletion.

@spec new(client_options()) :: {:ok, t()} | {:error, term()}

Create a new client.

Options

  • :base_url (String.t/0) - The URL of the electric server, not including the path. E.g. for local development this would be http://localhost:3000.

  • :endpoint (String.t/0) - The full URL of the shape API endpoint. E.g. for local development this would be http://localhost:3000/v1/shape. Use this if you need a non-standard API path.

  • :database_id - Which database to use, optional unless Electric is used with multiple databases.

:base_url vs. :endpoint

If you configure your client using e.g. base_url: "http://localhost:3000" Electric will append the default shape API path "/v1/shape" to create the final endpoint configuration, in this case "http://localhost:3000/v1/shape".

If you wish to use a non-standard endpoint path because, for example, you wrap your Shape API calls in an authentication proxy, then configure the endpoint directly:

Client.new(endpoint: "https://my-server.my-domain.com/electric/shape/proxy")
@spec new!(client_options()) :: t() | no_return()

Create a new client with the given options or raise if the configuration is invalid.

Link to this function

shape(table_name, opts \\ [])

View Source

A shortcut to ShapeDefinition.new/2.

iex> Elixir.Client.shape("my_table")
{:ok, %Electric.Client.ShapeDefinition{table: "my_table"}}

Create a ShapeDefinition from an Ecto query.

Accepts any implementation of Ecto.Queryable (e.g. an %Ecto.Query{} struct or Ecto.Schema module) to generate a ShapeDefinition.

iex> query = from(t in MyApp.Todo, where: t.completed == false)
iex> Elixir.Client.shape!(query)
%Electric.Client.ShapeDefinition{table: "todos" where: "(\"completed\" = FALSE)"}

Values from the Electric change stream will be mapped to instances of the passed Ecto.Schema module.

Link to this function

shape!(table_or_query, opts \\ [])

View Source

A shortcut to ShapeDefinition.new!/2.

Link to this function

stream(client, shape_or_query, opts \\ [])

View Source
@spec stream(t(), shape(), stream_options()) :: Enumerable.t(message())

Use the client to return a stream of update messages for the given shape.

shape can be a table name, e.g. "my_table", a full ShapeDefinition including a table name and where clause, or (if Ecto is installed) an Ecto.Queryable instance, such as an Ecto.Query or a Ecto.Schema struct.

Options

  • :parser - A {module, args} tuple specifying the Electric.Client.ValueMapper implementation to use for mapping values from the sync stream into Elixir terms. The default value is nil.

  • :live (boolean/0) - If true (the default) reads an infinite stream of update messages from the server. The default value is true.

  • :replica - Instructs the server to send just the changed columns for an update (:modified) or the full row (:full). The default value is :default.

  • :oneshot (boolean/0) - Only make a single request and then terminate the stream. The default value is false.

  • :resume - Resume the stream from the given point. Message.ResumeMessage messages are appended to the change stream if you terminate it early using live: false or oneshot: true

Return an authenticated URL for the given request attributes.