Phoenix.Sync.Shape (Phoenix.Sync v0.6.1)
View SourceMaterialize a shape stream into memory and subscribe to changes.
Add a Phoenix.Sync.Shape to your supervision tree to subscribe to a
sync stream and maintain a synchronized local copy of the data in-memory.
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
{Phoenix.Sync.Shape, [MyApp.Todo, name: MyApp.TodoShape]}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endOr use start_link/1 or start_link/2 to start a shape process manually:
{:ok, pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo)
{:ok, pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, replica: :full)This allows you to subscribe to the shape and receive notifications and also retrieve the current dataset.
to_list/2 will return the current state of the shape as a list:
Phoenix.Sync.Shape.to_list(MyApp.TodoShape)
[
{~s|"public"."todos"/"1"|, %MyApp.Todo{id: 1}},
{~s|"public"."todos"/"2"|, %MyApp.Todo{id: 2}},
# ...
]subscribe/2 registers the current process to receive change events (and
unsubscribe/1 allows you to stop receiving them):
# use the registered `name` of the shape to receive change events
# within a GenServer or other process
def init(_args) do
ref = Phoenix.Sync.Shape.subscribe(MyApp.TodoShape)
{:ok, ref}
end
# handle the shape events...
def handle_info({:sync, ref, {operation, {key, _value}}, ref)
when operation in [:insert, :update, :delete] do
IO.inspect([{operation, key}])
{:noreply, state}
end
def handle_info({:sync, ref, control}, ref)
when control in [:up_to_date, :must_refetch] do
IO.inspect(control: control)
{:noreply, state}
endKeys
The sync stream gives every value in the source table a unique key which is
defined as the full table name (with namespace) and the primary key value,
e.g. ~s|"public"."todos"/"1"|.
If a table has a composite primary key, the key will include all primary key
values. e.g. ~s|"public"."todos"/"1"/"123"|.
Summary
Functions
A wrapper around Enum.find/3 that searches for a value in the shape.
Start a new shape process that will receive the sync stream events and maintain an in-memory copy of the dataset in sync with Postgres.
Return a lazy stream of the current values in the shape.
Subscribe the current process to the given shape and receive notifications about new changes on the stream.
Get a list of the current subscribers to the given shape.
Get all values in the shape.
Get all data in the shape as a map.
Get all data in the shape as a map, transforming the keys and values.
Unsubscribe the current process from the given shape.
Types
@type control() :: :up_to_date | :must_refetch
@type enum_opts() :: [{:keys, boolean()}]
@type key() :: String.t()
@type operation() :: :insert | :update | :delete
@type shape() :: GenServer.server()
@type shape_options() :: [Phoenix.Sync.queryable() | stream_options()]
@type stream_options() :: [ ({:table, binary()} | {:where, nil | binary()} | {:columns, nil | [binary()]} | {:namespace, nil | binary()} | {:params, nil | %{optional(pos_integer()) => binary() | integer() | float() | boolean()} | [binary() | integer() | float() | boolean()]} | {:replica, term()} | {:transform, {module(), atom(), [term()]} | (term() -> term()) | atom()}) | {:name, GenServer.name()} ]
@type subscribe_opts() :: [only: [subscription_msg_type()], tag: term()]
@type tag() :: term()
@type value() :: term()
Functions
@spec find(shape(), Enum.default(), (value() -> any())) :: value() | Enum.default()
A wrapper around Enum.find/3 that searches for a value in the shape.
The matcher function is only passed the shape value, not the key.
@spec start_link(shape_options()) :: GenServer.on_start()
Start a new shape process that will receive the sync stream events and maintain an in-memory copy of the dataset in sync with Postgres.
Options
Shapes are defined exactly as for Phoenix.Sync.Client.stream/2:
# using an `Ecto.Schema` module
{:ok, pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo)
# or a full `Ecto.Query`
{:ok, pid} = Phoenix.Sync.Shape.start_link(
from(t in MyApp.Todo, where: t.completed == true)
)
# we can pass extra sync options
{:ok, pid} = Phoenix.Sync.Shape.start_link(
from(t in MyApp.Todo, where: t.completed == true),
replica: :full
)but also accept a :name much like other GenServer processes.
{:ok, pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, name: TodosShape)To start a Shape within a supervision tree, you pass the options as the child spec as if they were arguments:
children = [
{Phoenix.Sync.Shape, [MyApp.Todo, name: TodoShape]}
]Options
:where- Filter the table according to the where clause. The default value isnil.:columns- List of columns to include in the shape. Must include all primary keys. Ifnilthis is equivalent to all columns (SELECT *) The default value isnil.:namespace- The namespace the table belongs to. Ifnilthen Postgres will use whatever schema is the default (usuallypublic). The default value isnil.:params- Values of positional parameters in the where clause. These will substitute$iplaceholder in the where clause. The default value isnil.:replica- Modifies the data sent in update and delete change messages.When set to
:fullthe entire row will be sent for updates and deletes, not just the changed columns.The default value is
:default.:parser- A{module, args}tuple specifying theElectric.Client.ValueMapperimplementation to use for mapping values from the sync stream into Elixir terms. The default value is{Electric.Client.ValueMapper, []}.
@spec start_link(String.t() | Phoenix.Sync.queryable(), stream_options()) :: GenServer.on_start()
See start_link/1.
Return a lazy stream of the current values in the shape.
{:ok, _pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, name: :todos)
stream = Phoenix.Sync.Shape.stream(:todos)
Enum.into(stream, %{})
%{
~s|"public"."todos"/"1"| => %MyApp.Todo{id: 1},
~s|"public"."todos"/"2"| => %MyApp.Todo{id: 2},
# ...
}Options
keys(default:true) - if set tofalse, returns only the values without keys.batch_size(default:100) - the number of rows to fetch from the backing ETS table in each iteration. Larger values are more efficient but will use more memory.
@spec subscribe(shape(), subscribe_opts()) :: reference()
Subscribe the current process to the given shape and receive notifications about new changes on the stream.
# create a shape that holds all the completed todos
{:ok, shape} = Phoenix.Sync.Shape.start_link(from(t in Todo, where: t.completed == true))
# subscribe this process to the shape events
ref = Phoenix.Sync.Shape.subscribe(shape)
receive do
{:sync, ^ref, {:insert, {_key, todo}} ->
# a todo has been marked as completed
{:sync, ^ref, {:delete, {_key, todo}} ->
# a todo has been deleted or moved from completed to not completed
{:sync, ^ref, {:update, {_key, todo}} ->
# the todo's title has been changed
{:sync, ^ref, :up_to_date} ->
# the local shape is fully synchronized with the remote server
{:sync, ^ref, :must_refetch} ->
# the local data has been invalidated and needs to be refetched
endIn the example above, changes in the completed state of a todo will move
the affected item into or out of the shape because it's built using a filter
that only allows completed todos. These moves into or out of the shape are
mapped to :insert and :delete events, respectively.
The message format is {tag, reference, payload}
tagdefaults to:syncbut can be customized using the:tagoption (see below).referenceis the unique reference for the subscription returned bysubscribe/2.typeis the operation type, which can be one of:insert,:update,:delete,:up_to_date, or:must_refetch.:up_to_dateindicates that the shape is fully synchronized.:must_refetchindicates that the shape needs to be refetched due to a schema change, table truncation or some other server-side change.
payloaddepends on the operation type:- For
:insert,:updateand:delete, it is a tuple{operation, {key, value}}wherekeyis a unique identifier for the value andvalueis the actual data. - For
:up_to_dateand:must_refetch, it is just the operation.
- For
Options
only- Limit events to only a subset of the available message types (see above). If not provided, all message types will be sent. If you want to receive all:insert,:updateand:deleteevents, you can useonly: :changes.tag- change the tag of the message sent to subscriber. Defaults to:sync.
Examples
# only receive notification when the shape is fully synchronized
ref = Phoenix.Sync.Shape.subscribe(query, only: [:up_do_date])
# only receive notification of deletes
ref = Phoenix.Sync.Shape.subscribe(query, only: [:delete])
# only receive notification when the shape has been invalidated and needs
# to be refetched. Give the message a custom tag to make it easier to
# identify the originating shape.
ref = Phoenix.Sync.Shape.subscribe(Todo, only: [:must_refetch], tag: {:sync, :todo})
Get a list of the current subscribers to the given shape.
Get all values in the shape.
Returns a list of {key, value} tuples, where key is a unique identifier
for the value.
Options
keys(default:true) - if set to false, returns only the values without keys.
Get all data in the shape as a map.
The keys of the map will be the sync stream keys.
{:ok, _pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, name: :todos)
Phoenix.Sync.Shape.to_map(:todos)
%{
~s|"public"."todos"/"1"| => %MyApp.Todo{id: 1},
~s|"public"."todos"/"2"| => %MyApp.Todo{id: 2},
# ...
}
Get all data in the shape as a map, transforming the keys and values.
{:ok, _pid} = Phoenix.Sync.Shape.start_link(MyApp.Todo, name: :todos)
Phoenix.Sync.Shape.to_map(:todos, fn {_key, %MyApp.Todo{id: id, title: title} ->
{id, title}
end)
%{
1 => "my first todo",
2 => "my second todo",
2 => "my third todo",
}
@spec unsubscribe(shape()) :: :ok
Unsubscribe the current process from the given shape.