Phoenix.Sync.Shape (Phoenix.Sync v0.5.1)
View SourceMaterialize a shape stream into memory and subscribe to changes.
Add a Phoenix.Sync.Shape
to your supervision tree to subscribe to a
sync stream and maintain a synchronized local copy of the data in-memory.
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
{Phoenix.Sync.Shape, [MyApp.Todo, name: MyApp.TodoShape]}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
Or use start_link/1
or start_link/2
to start a shape process manually:
{:ok, pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo)
{:ok, pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, replica: :full)
This allows you to subscribe to the shape and receive notifications and also retrieve the current dataset.
to_list/2
will return the current state of the shape as a list:
Phoenix.Sync.Shape.to_list(MyApp.TodoShape)
[
{~s|"public"."todos"/"1"|, %MyApp.Todo{id: 1}},
{~s|"public"."todos"/"2"|, %MyApp.Todo{id: 2}},
# ...
]
subscribe/2
registers the current process to receive change events (and
unsubscribe/1
allows you to stop receiving them):
# use the registered `name` of the shape to receive change events
# within a GenServer or other process
def init(_args) do
ref = Phoenix.Sync.Shape.subscribe(MyApp.TodoShape)
{:ok, ref}
end
# handle the shape events...
def handle_info({:sync, ref, {operation, {key, _value}}, ref)
when operation in [:insert, :update, :delete] do
IO.inspect([{operation, key}])
{:noreply, state}
end
def handle_info({:sync, ref, control}, ref)
when control in [:up_to_date, :must_refetch] do
IO.inspect(control: control)
{:noreply, state}
end
Keys
The sync stream gives every value in the source table a unique key which is
defined as the full table name (with namespace) and the primary key value,
e.g. ~s|"public"."todos"/"1"|
.
If a table has a composite primary key, the key will include all primary key
values. e.g. ~s|"public"."todos"/"1"/"123"|
.
Summary
Functions
A wrapper around Enum.find/3
that searches for a value in the shape.
Start a new shape process that will receive the sync stream events and maintain an in-memory copy of the dataset in sync with Postgres.
Return a lazy stream of the current values in the shape.
Subscribe the current process to the given shape and receive notifications about new changes on the stream.
Get a list of the current subscribers to the given shape.
Get all values in the shape.
Get all data in the shape as a map.
Get all data in the shape as a map, transforming the keys and values.
Unsubscribe the current process from the given shape.
Types
@type control() :: :up_to_date | :must_refetch
@type enum_opts() :: [{:keys, boolean()}]
@type key() :: String.t()
@type operation() :: :insert | :update | :delete
@type shape() :: GenServer.server()
@type shape_options() :: [Phoenix.Sync.queryable() | stream_options()]
@type stream_options() :: [ ({:table, binary()} | {:where, nil | binary()} | {:columns, nil | [binary()]} | {:namespace, nil | binary()} | {:params, nil | %{optional(pos_integer()) => binary() | integer() | float() | boolean()} | [binary() | integer() | float() | boolean()]} | {:replica, term()}) | {:name, GenServer.name()} ]
@type subscribe_opts() :: [only: [subscription_msg_type()], tag: term()]
@type tag() :: term()
@type value() :: term()
Functions
@spec find(shape(), Enum.default(), (value() -> any())) :: value() | Enum.default()
A wrapper around Enum.find/3
that searches for a value in the shape.
The matcher
function is only passed the shape value, not the key.
@spec start_link(shape_options()) :: GenServer.on_start()
Start a new shape process that will receive the sync stream events and maintain an in-memory copy of the dataset in sync with Postgres.
Options
Shapes are defined exactly as for Phoenix.Sync.Client.stream/2
:
# using an `Ecto.Schema` module
{:ok, pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo)
# or a full `Ecto.Query`
{:ok, pid} = Phoenix.Sync.Shape.start_link(
from(t in MyApp.Todo, where: t.completed == true)
)
# we can pass extra sync options
{:ok, pid} = Phoenix.Sync.Shape.start_link(
from(t in MyApp.Todo, where: t.completed == true),
replica: :full
)
but also accept a :name
much like other GenServer
processes.
{:ok, pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, name: TodosShape)
To start a Shape within a supervision tree, you pass the options as the child spec as if they were arguments:
children = [
{Phoenix.Sync.Shape, [MyApp.Todo, name: TodoShape]}
]
Options
:where
- Filter the table according to the where clause. The default value isnil
.:columns
- List of columns to include in the shape. Must include all primary keys. Ifnil
this is equivalent to all columns (SELECT *
) The default value isnil
.:namespace
- The namespace the table belongs to. Ifnil
then Postgres will use whatever schema is the default (usuallypublic
). The default value isnil
.:params
- Values of positional parameters in the where clause. These will substitute$i
placeholder in the where clause. The default value isnil
.:replica
- Modifies the data sent in update and delete change messages.When set to
:full
the entire row will be sent for updates and deletes, not just the changed columns.The default value is
:default
.:parser
- A{module, args}
tuple specifying theElectric.Client.ValueMapper
implementation to use for mapping values from the sync stream into Elixir terms. The default value is{Electric.Client.ValueMapper, []}
.
@spec start_link(String.t() | Phoenix.Sync.queryable(), stream_options()) :: GenServer.on_start()
See start_link/1
.
Return a lazy stream of the current values in the shape.
{:ok, _pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, name: :todos)
stream = Phoenix.Sync.Shape.stream(:todos)
Enum.into(stream, %{})
%{
~s|"public"."todos"/"1"| => %MyApp.Todo{id: 1},
~s|"public"."todos"/"2"| => %MyApp.Todo{id: 2},
# ...
}
Options
keys
(default:true
) - if set tofalse
, returns only the values without keys.batch_size
(default:100
) - the number of rows to fetch from the backing ETS table in each iteration. Larger values are more efficient but will use more memory.
@spec subscribe(shape(), subscribe_opts()) :: reference()
Subscribe the current process to the given shape and receive notifications about new changes on the stream.
# create a shape that holds all the completed todos
{:ok, shape} = Phoenix.Sync.Shape.start_link(from(t in Todo, where: t.completed == true))
# subscribe this process to the shape events
ref = Phoenix.Sync.Shape.subscribe(shape)
receive do
{:sync, ^ref, {:insert, {_key, todo}} ->
# a todo has been marked as completed
{:sync, ^ref, {:delete, {_key, todo}} ->
# a todo has been deleted or moved from completed to not completed
{:sync, ^ref, {:update, {_key, todo}} ->
# the todo's title has been changed
{:sync, ^ref, :up_to_date} ->
# the local shape is fully synchronized with the remote server
{:sync, ^ref, :must_refetch} ->
# the local data has been invalidated and needs to be refetched
end
In the example above, changes in the completed
state of a todo will move
the affected item into or out of the shape because it's built using a filter
that only allows completed todos. These moves into or out of the shape are
mapped to :insert
and :delete
events, respectively.
The message format is {tag, reference, payload}
tag
defaults to:sync
but can be customized using the:tag
option (see below).reference
is the unique reference for the subscription returned bysubscribe/2
.type
is the operation type, which can be one of:insert
,:update
,:delete
,:up_to_date
, or:must_refetch
.:up_to_date
indicates that the shape is fully synchronized.:must_refetch
indicates that the shape needs to be refetched due to a schema change, table truncation or some other server-side change.
payload
depends on the operation type:- For
:insert
,:update
and:delete
, it is a tuple{operation, {key, value}}
wherekey
is a unique identifier for the value andvalue
is the actual data. - For
:up_to_date
and:must_refetch
, it is just the operation.
- For
Options
only
- Limit events to only a subset of the available message types (see above). If not provided, all message types will be sent. If you want to receive all:insert
,:update
and:delete
events, you can useonly: :changes
.tag
- change the tag of the message sent to subscriber. Defaults to:sync
.
Examples
# only receive notification when the shape is fully synchronized
ref = Phoenix.Sync.Shape.subscribe(query, only: [:up_do_date])
# only receive notification of deletes
ref = Phoenix.Sync.Shape.subscribe(query, only: [:delete])
# only receive notification when the shape has been invalidated and needs
# to be refetched. Give the message a custom tag to make it easier to
# identify the originating shape.
ref = Phoenix.Sync.Shape.subscribe(Todo, only: [:must_refetch], tag: {:sync, :todo})
Get a list of the current subscribers to the given shape.
Get all values in the shape.
Returns a list of {key, value}
tuples, where key
is a unique identifier
for the value.
Options
keys
(default:true
) - if set to false, returns only the values without keys.
Get all data in the shape as a map.
The keys of the map will be the sync stream keys.
{:ok, _pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, name: :todos)
Phoenix.Sync.Shape.to_map(:todos)
%{
~s|"public"."todos"/"1"| => %MyApp.Todo{id: 1},
~s|"public"."todos"/"2"| => %MyApp.Todo{id: 2},
# ...
}
Get all data in the shape as a map, transforming the keys and values.
{:ok, _pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, name: :todos)
Phoenix.Sync.Shape.to_map(:todos, fn {_key, %MyApp.Todo{id: id, title: title} ->
{id, title}
end)
%{
1 => "my first todo",
2 => "my second todo",
2 => "my third todo",
}
@spec unsubscribe(shape()) :: :ok
Unsubscribe the current process from the given shape.