# `Electric.Client`
[🔗](https://github.com/electric-sql/electric/tree/%40core/elixir-client%400.9.4/packages/elixir-client/lib/electric/client.ex#L1)

An Elixir client for [ElectricSQL](https://electric-sql.com).

Electric is a sync engine that allows you to sync
[little subsets](https://electric-sql.com/docs/guides/shapes)
of data from Postgres into local apps and services. This client
allows you to sync data from Electric into Elixir applications.

## Quickstart

### Start and connect the Electric sync service

Follow the
[Installation guide](https://electric-sql.com/docs/guides/installation)
to get Electric up-and-running and connected to a Postgres database.

### Create a table

Create a `foo` table in your Postgres schema, as per the Electric
[Quickstart guide](https://electric-sql.com/docs/guides/installation),
so that we have a table to sync.

    CREATE TABLE foo (
        id SERIAL PRIMARY KEY,
        name VARCHAR(255),
        value FLOAT
    );

### Install the Electric Client and receive sync events

Create a simple script that will subscribe to events from a `foo` table
in your Postgres database,

    # electric.ex
    Mix.install([
      :electric_client
    ])

    {:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")

    # You can create a stream from a table name or a Shape defined using
    # `ShapeDefinition.new/2`
    stream = Electric.Client.stream(client, "foo")

    for msg <- stream do
      IO.inspect(msg, pretty: true, syntax_colors: IO.ANSI.syntax_colors())
    end

Then run it:

    elixir electric.ex

In a separate terminal window, connect to the Postgres database:

    psql "postgresql://postgres:password@localhost:54321/electric"

Now any modifications you make to the data in the `foo` table will appear
as messages in the elixir process.

    INSERT INTO foo (name, value) VALUES
        ('josevalim', 4545),
        ('eksperimental', 966),
        ('lexmag', 677),
        ('whatyouhide', 598),
        ('ericmj', 583),
        ('alco', 377);

    UPDATE foo SET value = value + 1;

### Filtering Using WHERE clauses

You can subscribe to subsets of the data in your table using
[`where` clauses](https://electric-sql.com/docs/guides/shapes#where-clause).

    {:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")

    {:ok, shape} = Electric.Client.ShapeDefinition.new("foo", where: "name ILIKE 'a%'")

    stream = Electric.Client.stream(client, shape)

    for msg <- stream do
      # you will now only receive events for database rows matching the `where` clause
    end

## Configuration

See `new/1` for configuration options of the client itself,
and `stream/3` for details on configuring the stream itself.

## Ecto Integration

If you have [Ecto](https://hexdocs.pm/ecto) installed then you can define your Shapes
using Ecto queries:

    # ecto.ex
    Mix.install([
      :ecto_sql,
      :electric_client
    ])

    import Ecto.Query, only: [from: 2]
    import Ecto.Query.API, only: [ilike: 2]

    defmodule Foo do
      use Ecto.Schema

      schema "foo" do
        field :name, :string
        field :value, :float
      end
    end

    {:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")

    # Replace the table or `ShapeDefinition` with an `Ecto` query and set
    # `replica` to `:full` to receive full rows for update messages.
    #
    # The normal `replica: :default` setting will only send the changed
    # columns, so we'd end up with partial `%Foo{}` instances.
    stream =
      Electric.Client.stream(
        client,
        from(f in Foo, where: ilike(f.name, "a%")),
        replica: :full
      )

    for %{headers: %{operation: operation}, value: value} <- stream do
      # The message `value` will now be a `%Foo{}` struct
      IO.inspect([{operation, value}], pretty: true, syntax_colors: IO.ANSI.syntax_colors())
    end

## Custom Values

Electric sends all column values as binaries. The `Ecto` integration uses
`Ecto`'s schema information to turn those into the relevant Elixir terms but
we can provide our own `binary() => term()` mapping by implementing the
[`Electric.Client.ValueMapper` behaviour](`Electric.Client.ValueMapper`).

# `client_option`

```elixir
@type client_option() ::
  {:base_url, binary() | struct()}
  | {:endpoint, binary() | struct()}
  | {:params, %{optional(atom()) =&gt; term()}}
  | {:parser, {module(), [term()]}}
  | {:authenticator, {module(), [term()]}}
  | {:fetch, {module(), [term()]}}
  | {:pool, {module(), [term()]}}
```

# `client_options`

```elixir
@type client_options() :: [client_option()]
```

# `column`

```elixir
@type column() :: %{
  :type =&gt; String.t(),
  optional(:pk_index) =&gt; non_neg_integer(),
  optional(:not_null) =&gt; boolean(),
  optional(:max_length) =&gt; non_neg_integer(),
  optional(:length) =&gt; non_neg_integer()
}
```

# `cursor`

```elixir
@type cursor() :: integer()
```

# `ecto_shape`

```elixir
@type ecto_shape() ::
  Ecto.Queryable.t() | Ecto.Changeset.t() | (map() -&gt; Ecto.Changeset.t())
```

# `message`

```elixir
@type message() ::
  Electric.Client.Message.ControlMessage.t()
  | Electric.Client.Message.ChangeMessage.t()
  | Electric.Client.Message.ResumeMessage.t()
```

# `offset`

```elixir
@type offset() :: Electric.Client.Offset.t()
```

# `poll_option`

```elixir
@type poll_option() :: {:replica, replica()}
```

# `poll_options`

```elixir
@type poll_options() :: [poll_option()]
```

# `replica`

```elixir
@type replica() :: :default | :full
```

# `schema`

```elixir
@type schema() :: %{required(String.t()) =&gt; column()}
```

# `shape`

```elixir
@type shape() ::
  table_name() | Electric.Client.ShapeDefinition.t() | Ecto.Queryable.t()
```

# `shape_handle`

```elixir
@type shape_handle() :: String.t()
```

# `stream_option`

```elixir
@type stream_option() ::
  {:parser, nil | {module(), [term()]}}
  | {:live, boolean()}
  | {:replica, :default | :full}
  | {:resume, Electric.Client.Message.ResumeMessage.t() | nil}
  | {:errors, :raise | :stream}
```

# `stream_options`

```elixir
@type stream_options() :: [stream_option()]
```

# `t`

```elixir
@type t() :: %Electric.Client{
  authenticator: term(),
  endpoint: URI.t(),
  fetch: {module(), term()},
  params: term(),
  parser: term(),
  pool: term()
}
```

# `table_name`

```elixir
@type table_name() :: String.t()
```

# `authenticate_request`

```elixir
@spec authenticate_request(t(), Electric.Client.Fetch.Request.t()) ::
  Electric.Client.Fetch.Request.authenticated()
```

Authenticate the given request using the `authenticator` configured in the `Client`.

# `authenticate_shape`

```elixir
@spec authenticate_shape(t(), Electric.Client.ShapeDefinition.t()) ::
  Electric.Client.Authenticator.headers()
```

Get authentication query parameters for the given `Elixir.Electric.Client.ShapeDefinition`.

# `delete_shape`

Use the given `client` to delete the `shape` instance from the server.

Delete shape only works if Electric is configured to `allow_shape_deletion`.

# `embedded`

```elixir
@spec embedded(Electric.Shapes.Api.options()) :: {:ok, t()} | {:error, term()}
```

Get a client instance that runs against an embedded instance of electric,
that is an electric app running as a dependency of the current application.

# `embedded!`

```elixir
@spec embedded!(Electric.Shapes.Api.options()) :: t() | no_return()
```

# `new`

```elixir
@spec new(client_options()) :: {:ok, t()} | {:error, term()}
```

Create a new client.

## Options

* `:base_url` - The URL of the electric server, not including the path. E.g. for local development this would be `http://localhost:3000`.

* `:endpoint` - The full URL of the shape API endpoint. E.g. for local development this would be `http://localhost:3000/v1/shape`. Use this if you need a non-standard API path.

* `:params` (map of `t:atom/0` keys and `t:term/0` values) - Additional query parameters to include in every request to the Electric backend. The default value is `%{}`.

* `:parser` - A `{module, args}` tuple specifying the `Electric.Client.ValueMapper`
  implementation to use for mapping values from the sync stream into Elixir
  terms. The default value is `{Electric.Client.ValueMapper, []}`.

* `:fetch` - A `{module, opts}` tuple specifying the `Electric.Client.Fetch`
  implementation to use for calling the Electric API.

  See `Electric.Client.Fetch.HTTP` for the options available
  when using the default `HTTP` fetcher.

       Client.new(
         base_url: "http://localhost:3000",
         fetch: {Electric.Client.Fetch.HTTP,
           # never error, just keep retrying if the Electric server is down
           timeout: :infinity,
           # add a bearer token to every request (see `authenticator` if you need more
           # control over authenticating your requests
           headers: [{"authorize", "Bearer some-token-here"}]}
       )

  The default value is `{Electric.Client.Fetch.HTTP, []}`.

### `:base_url` vs. `:endpoint`

If you configure your client using e.g. `base_url: "http://localhost:3000"`
Electric will append the default shape API path
`"/v1/shape"` to create the final endpoint configuration,
in this case `"http://localhost:3000/v1/shape"`.

If you wish to use a non-standard endpoint path because, for example, you wrap your Shape
API calls in an [authentication
proxy](https://electric-sql.com/docs/guides/auth), then configure
the endpoint directly:

    Client.new(endpoint: "https://my-server.my-domain.com/electric/shape/proxy")

# `new!`

```elixir
@spec new!(client_options()) :: t() | no_return()
```

Create a new client with the given options or raise if the configuration is
invalid.

# `poll`

```elixir
@spec poll(t(), shape(), Electric.Client.ShapeState.t(), poll_options()) ::
  {:ok, [message()], Electric.Client.ShapeState.t()}
  | {:must_refetch, [message()], Electric.Client.ShapeState.t()}
  | {:error, Electric.Client.Error.t()}
```

Make a single long-polling request to fetch shape changes.

Unlike `stream/3` which returns an `Enumerable` that continuously fetches,
`poll/4` makes a single request and returns explicit results. This is useful
when you want:

- Explicit control over request timing (rate limiting, batching)
- Request-response semantics (not a continuous stream)
- Integration with existing event loops or supervision trees
- Simpler error handling without stream complexity

## Arguments

  * `client` - The Electric client
  * `shape` - The shape definition (table name, `ShapeDefinition`, or Ecto query)
  * `state` - The current polling state (use `ShapeState.new()` for initial request)
  * `opts` - Options:
    * `:replica` - `:default` or `:full` (default: `:default`)

## Returns

  * `{:ok, messages, new_state}` - Success, messages received. Use `new_state` for next poll.
  * `{:must_refetch, messages, new_state}` - Shape was reset (409). Local state should be cleared.
  * `{:error, error}` - Error occurred

## Example

    # Create initial state
    state = Electric.Client.ShapeState.new()

    # First poll gets initial snapshot (non-live request)
    {:ok, messages, state} = Electric.Client.poll(client, "items", state, replica: :full)

    # Process messages...
    Enum.each(messages, &process_message/1)

    # Subsequent polls are live (long-poll until changes)
    {:ok, messages, state} = Electric.Client.poll(client, "items", state, replica: :full)

## Behavior

- First request (when `state.up_to_date? == false`): Makes a non-live request to get initial snapshot
- Subsequent requests (when `state.up_to_date? == true`): Makes a live request that long-polls until changes
- Handles synthetic deletes from move-out events automatically

# `shape`

```elixir
@spec shape(String.t(), Electric.Client.ShapeDefinition.options()) ::
  {:ok, Electric.Client.ShapeDefinition.t()} | {:error, term()}
```

A shortcut to [`ShapeDefinition.new/2`](`Electric.Client.ShapeDefinition.new/2`).

    iex> Elixir.Client.shape("my_table")
    {:ok, %Electric.Client.ShapeDefinition{table: "my_table"}}

# `shape!`

```elixir
@spec shape!(ecto_shape()) :: Electric.Client.ShapeDefinition.t() | no_return()
```

Create a [`ShapeDefinition`](`Electric.Client.ShapeDefinition`) from an `Ecto` query.

Accepts any implementation of `Ecto.Queryable` (e.g. an [`%Ecto.Query{}`](`Ecto.Query`) struct or
`Ecto.Schema` module) to generate a [`ShapeDefinition`](`Electric.Client.ShapeDefinition`).

    iex> query = from(t in MyApp.Todo, where: t.completed == false)
    iex> Elixir.Client.shape!(query)
    %Electric.Client.ShapeDefinition{table: "todos", where: "(\"completed\" = FALSE)"}

Also takes an `Ecto.Changeset` or 1-arity function returning an `Ecto.Changeset`:

    iex> Elixir.Client.shape!(&MyApp.Todo.changeset/1)
    %Electric.Client.ShapeDefinition{table: "todos", columns: ["title", "completed"]}

Values from the Electric change stream will be mapped to instances of the
passed `Ecto.Schema` module.

## Column subsets

Specifying a subset of columns to stream can be done in two ways:

1. Passing a changeset will filter the table columns according to the applied
validations so if you want a shape to include a column, you must ensure
that it has some kind of validation, using e.g.
`Ecto.Changeset.validate_required/2`

2. Using `Ecto.Query.select/3` to select a subset of columns within the query:

    Electric.Client.shape!(
      from(t in MyApp.Todo, where: t.completed == false, select: [:id, :title])
    )

[`select/3`](`Ecto.Query.select/3`) allows for various ways to specify the columns,
we only support the following forms:

- `select(Todo, [t], [:id, :name])`
- `select(Todo, [t], [t.id, t.name])`
- `select(Todo, [t], {t.id, t.name})`
- `select(Todo, [t], struct(t, [:id, :name]))`
- `select(Todo, [t], map(t, [:id, :name]))`

You definitely **can't** add virtual columns like this:

- `select(Todo, [t], map(t, %{t | reason: "here"}))`

We also support `Ecto.Query.select_merge/3` to add additional columns:

- `select(Todo, [t], map(t, [:id, :name])) |> select_merge([:completed])`

# `shape!`

```elixir
@spec shape!(String.t(), Electric.Client.ShapeDefinition.options()) ::
  Electric.Client.ShapeDefinition.t() | no_return()
```

A shortcut to [`ShapeDefinition.new!/2`](`Electric.Client.ShapeDefinition.new!/2`).

# `stream`

```elixir
@spec stream(t(), stream_options()) :: Enumerable.t(message())
@spec stream(t(), Electric.Client.ShapeDefinition.t()) :: Enumerable.t(message())
@spec stream(t(), String.t() | ecto_shape()) :: Enumerable.t(message())
@spec stream(String.t(), stream_options()) :: Enumerable.t(message())
```

Get a stream of update messages.

This accepts a variety of arguments:

### Examples:

- Using a custom endpoint that returns a pre-defined shape.

  If you have used `Electric.Phoenix` to mount a pre-defined shape into your
  Phoenix application, then by creating a client with the `endpoint` set to the
  URL of this route you can stream data directly from this client:

      {:ok, client} = Electric.Client.new(endpoint: "http://localhost:4000/shapes/todo")
      stream = Electric.Client.stream(client)

  Equivalently you can just pass the URL as the argument to stream:

      stream = Electric.Client.stream("http://localhost:4000/shapes/todo")

- Using a simple table name to define a shape:

      {:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")
      stream = Electric.Client.stream(client, "todos")

- Using a full shape definition:

      {:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")
      {:ok, shape} = Electric.Client.shape("todos", where: "completed = false", replica: :full)
      stream = Electric.Client.stream(client, shape)

- Or with an `Ecto` query or `Ecto.Schema` struct:

      {:ok, client} = Electric.Client.new(base_url: "http://localhost:3000")
      stream = Electric.Client.stream(client, from(t in MyApp.Todos.Todo, where: t.completed == false))

If you want to pass options to your stream, then pass them as the second
argument or use `stream/3`.

# `stream`

```elixir
@spec stream(t(), shape(), stream_options()) :: Enumerable.t(message())
```

Use the `client` to return a stream of update messages for the given `shape`.

`shape` can be a table name, e.g. `"my_table"`, a full `ShapeDefinition`
including a table name and `where` clause, or (if `Ecto` is installed) an
`Ecto.Queryable` instance, such as an `Ecto.Query` or a `Ecto.Schema` struct.

## Options

* `:parser` - A `{module, args}` tuple specifying the `Electric.Client.ValueMapper`
  implementation to use for mapping values from the sync stream into Elixir
  terms. The default value is `nil`.

* `:live` (`t:boolean/0`) - If `true` (the default) reads an infinite stream of update messages from the server. The default value is `true`.

* `:replica` - Instructs the server to send just the changed columns for an update (`:modified`) or the full row (`:full`). The default value is `:default`.

* `:resume` - Resume the stream from the given point. `Message.ResumeMessage` messages
  are appended to the change stream if you terminate it early using `live:
  false`

* `:errors` - How errors from the Electric server should be handled.

  - `:raise` (default) - raise an exception if the server returns an error
  - `:stream` - put the error into the message stream (and terminate)

  The default value is `:raise`.

# `url`

```elixir
@spec url(t(), Electric.Client.Fetch.Request.attrs()) :: binary()
```

Return an authenticated URL for the given request attributes.

---

*Consult [api-reference.md](api-reference.md) for complete listing*
