Phoenix.Sync.Shape (Phoenix.Sync v0.5.1)

View Source

Materialize a shape stream into memory and subscribe to changes.

Add a Phoenix.Sync.Shape to your supervision tree to subscribe to a sync stream and maintain a synchronized local copy of the data in-memory.

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      {Phoenix.Sync.Shape, [MyApp.Todo, name: MyApp.TodoShape]}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Or use start_link/1 or start_link/2 to start a shape process manually:

{:ok, pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo)

{:ok, pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, replica: :full)

This allows you to subscribe to the shape and receive notifications and also retrieve the current dataset.

to_list/2 will return the current state of the shape as a list:

Phoenix.Sync.Shape.to_list(MyApp.TodoShape)
[
  {~s|"public"."todos"/"1"|, %MyApp.Todo{id: 1}},
  {~s|"public"."todos"/"2"|, %MyApp.Todo{id: 2}},
  # ...
]

subscribe/2 registers the current process to receive change events (and unsubscribe/1 allows you to stop receiving them):

# use the registered `name` of the shape to receive change events
# within a GenServer or other process
def init(_args) do
  ref = Phoenix.Sync.Shape.subscribe(MyApp.TodoShape)
  {:ok, ref}
end

# handle the shape events...
def handle_info({:sync, ref, {operation, {key, _value}}, ref)
    when operation in [:insert, :update, :delete] do
  IO.inspect([{operation, key}])
  {:noreply, state}
end

def handle_info({:sync, ref, control}, ref)
    when control in [:up_to_date, :must_refetch] do
  IO.inspect(control: control)
  {:noreply, state}
end

Keys

The sync stream gives every value in the source table a unique key which is defined as the full table name (with namespace) and the primary key value, e.g. ~s|"public"."todos"/"1"|.

If a table has a composite primary key, the key will include all primary key values. e.g. ~s|"public"."todos"/"1"/"123"|.

Summary

Functions

A wrapper around Enum.find/3 that searches for a value in the shape.

Start a new shape process that will receive the sync stream events and maintain an in-memory copy of the dataset in sync with Postgres.

Return a lazy stream of the current values in the shape.

Subscribe the current process to the given shape and receive notifications about new changes on the stream.

Get a list of the current subscribers to the given shape.

Get all values in the shape.

Get all data in the shape as a map.

Get all data in the shape as a map, transforming the keys and values.

Unsubscribe the current process from the given shape.

Types

control()

@type control() :: :up_to_date | :must_refetch

enum_opts()

@type enum_opts() :: [{:keys, boolean()}]

event()

@type event() :: {tag(), reference(), payload()}

key()

@type key() :: String.t()

operation()

@type operation() :: :insert | :update | :delete

payload()

@type payload() :: {operation(), {key(), value()}} | control()

shape()

@type shape() :: GenServer.server()

shape_options()

@type shape_options() :: [Phoenix.Sync.queryable() | stream_options()]

stream_options()

@type stream_options() :: [
  ({:table, binary()}
   | {:where, nil | binary()}
   | {:columns, nil | [binary()]}
   | {:namespace, nil | binary()}
   | {:params,
      nil
      | %{optional(pos_integer()) => binary() | integer() | float() | boolean()}
      | [binary() | integer() | float() | boolean()]}
   | {:replica, term()})
  | {:name, GenServer.name()}
]

subscribe_opts()

@type subscribe_opts() :: [only: [subscription_msg_type()], tag: term()]

subscription_msg_type()

@type subscription_msg_type() :: operation() | control()

tag()

@type tag() :: term()

value()

@type value() :: term()

Functions

find(shape, default \\ nil, matcher)

@spec find(shape(), Enum.default(), (value() -> any())) :: value() | Enum.default()

A wrapper around Enum.find/3 that searches for a value in the shape.

The matcher function is only passed the shape value, not the key.

start_link(args)

@spec start_link(shape_options()) :: GenServer.on_start()

Start a new shape process that will receive the sync stream events and maintain an in-memory copy of the dataset in sync with Postgres.

Options

Shapes are defined exactly as for Phoenix.Sync.Client.stream/2:

# using an `Ecto.Schema` module
{:ok, pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo)

# or a full `Ecto.Query`
{:ok, pid} = Phoenix.Sync.Shape.start_link(
  from(t in MyApp.Todo, where: t.completed == true)
)

# we can pass extra sync options
{:ok, pid} = Phoenix.Sync.Shape.start_link(
  from(t in MyApp.Todo, where: t.completed == true),
  replica: :full
)

but also accept a :name much like other GenServer processes.

{:ok, pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, name: TodosShape)

To start a Shape within a supervision tree, you pass the options as the child spec as if they were arguments:

children = [
  {Phoenix.Sync.Shape, [MyApp.Todo, name: TodoShape]}
]

Options

  • :where - Filter the table according to the where clause. The default value is nil.

  • :columns - List of columns to include in the shape. Must include all primary keys. If nil this is equivalent to all columns (SELECT *) The default value is nil.

  • :namespace - The namespace the table belongs to. If nil then Postgres will use whatever schema is the default (usually public). The default value is nil.

  • :params - Values of positional parameters in the where clause. These will substitute $i placeholder in the where clause. The default value is nil.

  • :replica - Modifies the data sent in update and delete change messages.

    When set to :full the entire row will be sent for updates and deletes, not just the changed columns.

    The default value is :default.

  • :parser - A {module, args} tuple specifying the Electric.Client.ValueMapper implementation to use for mapping values from the sync stream into Elixir terms. The default value is {Electric.Client.ValueMapper, []}.

start_link(queryable, opts)

See start_link/1.

stream(shape, opts \\ [])

@spec stream(shape(), opts :: enum_opts()) :: Enumerable.t({key(), value()} | value())

Return a lazy stream of the current values in the shape.

{:ok, _pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, name: :todos)

stream = Phoenix.Sync.Shape.stream(:todos)

Enum.into(stream, %{})
%{
  ~s|"public"."todos"/"1"| => %MyApp.Todo{id: 1},
  ~s|"public"."todos"/"2"| => %MyApp.Todo{id: 2},
  # ...
}

Options

  • keys (default: true) - if set to false, returns only the values without keys.
  • batch_size (default: 100) - the number of rows to fetch from the backing ETS table in each iteration. Larger values are more efficient but will use more memory.

subscribe(shape, opts \\ [])

@spec subscribe(shape(), subscribe_opts()) :: reference()

Subscribe the current process to the given shape and receive notifications about new changes on the stream.

# create a shape that holds all the completed todos
{:ok, shape} = Phoenix.Sync.Shape.start_link(from(t in Todo, where: t.completed == true))

# subscribe this process to the shape events
ref = Phoenix.Sync.Shape.subscribe(shape)

receive do
  {:sync, ^ref, {:insert, {_key, todo}} ->
    # a todo has been marked as completed
  {:sync, ^ref, {:delete, {_key, todo}} ->
    # a todo has been deleted or moved from completed to not completed
  {:sync, ^ref, {:update, {_key, todo}} ->
    # the todo's title has been changed
  {:sync, ^ref, :up_to_date} ->
    # the local shape is fully synchronized with the remote server
  {:sync, ^ref, :must_refetch} ->
    # the local data has been invalidated and needs to be refetched
end

In the example above, changes in the completed state of a todo will move the affected item into or out of the shape because it's built using a filter that only allows completed todos. These moves into or out of the shape are mapped to :insert and :delete events, respectively.

The message format is {tag, reference, payload}

  • tag defaults to :sync but can be customized using the :tag option (see below).

  • reference is the unique reference for the subscription returned by subscribe/2.

  • type is the operation type, which can be one of :insert, :update, :delete, :up_to_date, or :must_refetch.

    • :up_to_date indicates that the shape is fully synchronized.
    • :must_refetch indicates that the shape needs to be refetched due to a schema change, table truncation or some other server-side change.
  • payload depends on the operation type:

    • For :insert, :update and :delete, it is a tuple {operation, {key, value}} where key is a unique identifier for the value and value is the actual data.
    • For :up_to_date and :must_refetch, it is just the operation.

Options

  • only - Limit events to only a subset of the available message types (see above). If not provided, all message types will be sent. If you want to receive all :insert, :update and :delete events, you can use only: :changes.

  • tag - change the tag of the message sent to subscriber. Defaults to :sync.

Examples

# only receive notification when the shape is fully synchronized
ref = Phoenix.Sync.Shape.subscribe(query, only: [:up_do_date])

# only receive notification of deletes
ref = Phoenix.Sync.Shape.subscribe(query, only: [:delete])

# only receive notification when the shape has been invalidated and needs
# to be refetched. Give the message a custom tag to make it easier to
# identify the originating shape.
ref = Phoenix.Sync.Shape.subscribe(Todo, only: [:must_refetch], tag: {:sync, :todo})

subscribers(shape)

@spec subscribers(shape()) :: [pid()]

Get a list of the current subscribers to the given shape.

to_list(shape, opts \\ [])

@spec to_list(shape(), opts :: enum_opts()) :: [{key(), value()}] | [value()]

Get all values in the shape.

Returns a list of {key, value} tuples, where key is a unique identifier for the value.

Options

  • keys (default: true) - if set to false, returns only the values without keys.

to_map(shape)

@spec to_map(shape()) :: map()

Get all data in the shape as a map.

The keys of the map will be the sync stream keys.

{:ok, _pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, name: :todos)

Phoenix.Sync.Shape.to_map(:todos)
%{
  ~s|"public"."todos"/"1"| => %MyApp.Todo{id: 1},
  ~s|"public"."todos"/"2"| => %MyApp.Todo{id: 2},
  # ...
}

to_map(shape, transform)

@spec to_map(shape(), ({key(), value()} -> {Map.key(), Map.value()})) :: map()

Get all data in the shape as a map, transforming the keys and values.

{:ok, _pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, name: :todos)

Phoenix.Sync.Shape.to_map(:todos, fn {_key, %MyApp.Todo{id: id, title: title} ->
  {id, title}
end)
%{
  1 => "my first todo",
  2 => "my second todo",
  2 => "my third todo",
}

unsubscribe(shape)

@spec unsubscribe(shape()) :: :ok

Unsubscribe the current process from the given shape.