Phoenix.Sync (Phoenix.Sync v0.6.1)
View SourceReal-time sync for Postgres-backed Phoenix applications.
See the docs for more information.
Summary
Functions
Interrupts all long-polling requests matching the given shape definition.
Returns the required adapter configuration for your Phoenix Endpoint or
Plug.Router.
Returns a shape definition for the given params.
Types
@type param_overrides() :: [param_override()]
@type queryable() :: Ecto.Queryable.t() | Ecto.Schema.t() | Ecto.Changeset.t()
@type shape_definition() :: String.t() | queryable() | shape_specification()
@type shape_specification() :: [ table: binary(), where: nil | binary(), columns: nil | [binary()], namespace: nil | binary(), params: nil | %{optional(pos_integer()) => binary() | integer() | float() | boolean()} | [binary() | integer() | float() | boolean()], replica: term(), transform: {module(), atom(), [term()]} | (term() -> term()) | atom() ]
Functions
@spec interrupt( shape_definition() | (match_shape_params() -> boolean()), shape_specification() ) :: {:ok, non_neg_integer()}
Interrupts all long-polling requests matching the given shape definition.
The broader the shape definition, the more requests will be interrupted.
Returns the number of interrupted requests.
Examples
To interrupt all shapes on the todos table:
Phoenix.Sync.interrupt("todos")
Phoenix.Sync.interrupt(table: "todos")or the same using an Ecto.Schema module:
Phoenix.Sync.interrupt(Todos.Todo)all shapes with the given parameterized where clause:
Phoenix.Sync.interrupt(table: "todos", where: "user_id = $1")or a single shape for the given user:
Phoenix.Sync.interrupt(
  from(t in Todos.Todo, where: t.user_id == ^user_id)
)
# or
Phoenix.Sync.interrupt(
  table: "todos",
  where: "user_id = $1",
  params: [user_id]
)
# or
Phoenix.Sync.interrupt(
  table: "todos",
  where: "user_id = '#{user_id}'"
)If you want more control over the match, you can pass a function that will
receive a normalized shape definition and should return true if the active
shape matches.
  Phoenix.Sync.interrupt(fn %{table: _, where: _, params: _} = shape ->
    shape.table == "todos" &&
      shape.where == "user_id = $1" &&
      shape.params["0"] == user_id
  end)The normalized shape argument is a map with the following keys:
- table, e.g.- "todos"
- namespace, e.g.- "public"
- where, e.g.- "where user_id = $1"
- params, a map of argument position to argument value, e.g.- %{"0" => "true", "1" => "..."}
- columns, e.g.- ["id", "title"]
All except table may be nil.
Interrupting Ecto Query-based Shapes
Be careful when mixing Ecto query-based shapes with interrupt calls using
hand-written where clauses.
The shape
Phoenix.Sync.Controller.sync_stream(conn, params, fn ->
  from(t in Todos.Todo, where: t.user_id == ^user_id)
end)will not be interrupted by
Phoenix.Sync.interrupt(
  table: "todos",
  where: "user_id = '#{user_id}'"
)because the where clause matching is a simple exact string match and Ecto query
generated where clauses will generally be different from the equivalent
hand-written version. If you want to interrupt a query-based shape you should
use the same query as the interrupt criteria.
Writing interrupts
It's better to be too broad with your interrupt calls than too narrow.
Only clients whose shape definition changes after the interrupt/1 call
will be affected.
Supported options
The more options you give the more specific the interrupt call will be. Only the table name is required.
- table- Required. Interrupts all shapes matching the given table. E.g.- "todos"
- namespace- The table namespace. E.g.- "public"
- where- The shape's where clause. Can in be parameterized and will match all shapes with the same where filter irrespective of the parameters (unless provided). E.g.- "status = $1",- "completed = true"
- columns- The columns included in the shape. E.g.- ["id", "title", "completed"]
- params- The values associated with a parameterized where clause. E.g.- [true, 1, "alive"],- %{1 => true}
Returns the required adapter configuration for your Phoenix Endpoint or
Plug.Router.
Phoenix
Configure your endpoint with the configuration at runtime by passing the
phoenix_sync configuration to your endpoint in the Application.start/2
callback:
def start(_type, _args) do
  children = [
    # ...
    {MyAppWeb.Endpoint, phoenix_sync: Phoenix.Sync.plug_opts()}
  ]
endPlug
Add the configuration to the Plug opts in your server configuration:
children = [
  {Bandit, plug: {MyApp.Router, phoenix_sync: Phoenix.Sync.plug_opts()}}
]Your Plug.Router must be configured with
copy_opts_to_assign and you should use the rele
defmodule MyApp.Router do
  use Plug.Router, copy_opts_to_assign: :options
  use Phoenix.Sync.Controller
  use Phoenix.Sync.Router
  plug :match
  plug :dispatch
  sync "/shapes/todos", Todos.Todo
  get "/shapes/user-todos" do
    %{"user_id" => user_id} = conn.params
    sync_render(conn, from(t in Todos.Todo, where: t.owner_id == ^user_id)
  end
end@spec shape!(shape_definition(), shape_specification()) :: Phoenix.Sync.PredefinedShape.t()
Returns a shape definition for the given params.
Examples
- An - Ecto.Schemamodule:- Phoenix.Sync.shape!(MyPlugApp.Todos.Todo)
- An - Ectoquery:- Phoenix.Sync.shape!(from(t in Todos.Todo, where: t.owner_id == ^user_id))
- A - changeset/1function which defines the table and columns:- Phoenix.Sync.shape!(&Todos.Todo.changeset/1)
- A - changeset/1function plus a where clause:- Phoenix.Sync.shape!( &Todos.Todo.changeset/1, where: "completed = false" )- or a parameterized where clause: - Phoenix.Sync.shape!( &Todos.Todo.changeset/1, where: "completed = $1", params: [false] )
- A keyword list defining the shape parameters: - Phoenix.Sync.shape!( table: "todos", namespace: "my_app", where: "completed = $1", params: [false] )
Transforms
Using the transform option it's possible to modify the sync messages before
they are sent to the clients via the sync
router macro or sync_render
within your controllers.
  Phoenix.Sync.shape!(
    table: "todos",
    transform: &MyApp.Todos.transform/1
  )The transform function is passed the change messages in raw form and can transform the messages to a limited extent as required by the application.
The transform process is effectively a Stream.flat_map/2 operation over
the sync messages, so if you want to use pattern matching to perform some
kind of additional filtering operation to remove messages, e.g. based on some
authorization logic, then you can simply return an empty list:
  # don't send delete messages to the client
  def transform(%{"headers" => %{"operation" => "delete"}}), do: []
  def transform(message), do: [message]Removing messages from the stream can be useful for cases where you want to perform additional runtime filtering for authorization reasons that you're not able to do at the database level. Be aware that this can impact consistency of the client state so is an advanced feature that should be used with care.
The messages passed to the transform function are of the form:
  %{
    "key" => key,
    "headers" => %{"operation" => operation, ...},
    "value" => %{"column_name" => column_value, ...}
  }- keyis a unique identifier for the row formed of the namespaced table name plus the values of the row's primary key(s). DO NOT MODIFY THIS VALUE.
- headersis a map of metadata about the change. The- operationkey will have one of the values- "insert",- "update"or- "delete". You should leave this as-is unless you have a very good reason to modify it.
- valueis the actual row data. Unless the shape is defined with- replica: :fullonly- insertoperations will contain the full row data.- updateoperations will only contain the columns that have changed and- deleteoperations will only contain the primary key columns.
You can modify the values of the value map, as required by you application,
but you should only modify values in a way that's compatible with the
column's datatype. E.g. don't concat a integer column with a string (unless
the resulting string will parse as an integer...). It is also unwise to
modify the primary key values of any row unless you can be sure not to cause
conflicts. Any column values you add that aren't in the backing Postgres
table will be passed through to the client as-is.
When using the raw stream/2 function to
receive a sync stream directly, the transform option is unnecessary and
hence ignored. You should use the functions available in Enum and Stream
to perform any data transforms.
Transform via Ecto.Schema
If you have custom field types in your Ecto.Schema module you can set up a
transform that passes the raw data from the replication stream through the
Ecto load machinery to ensure that the sync stream values match the values
you would see when using Ecto to load data directly from the database.
To do this pass the Ecto.Schema module as the transform function:
Phoenix.Sync.shape!(
  MyApp.Todos.Todo,
  transform: MyApp.Todos.Todo
)or in a route:
sync "todos", MyApp.Todos.Todo,
  transform: MyApp.Todos.TodoFor this to work you need to implement Jason.Encoder for your schema module
(or JSON.Encoder if you're on Elixir >= 1.18 but if Jason is available
then it will be used), e.g.:
defmodule MyApp.Todos.Todo do
  use Ecto.Schema
  @derive {Jason.Encoder, except: [:__meta__]}
  schema "todos" do
    field :title, :string
    field :completed, :boolean, default: false
  end
endEffect of transform on server load
Normally Phoenix.Sync simply passes the raw encoded JSON message stream
from the backend server straight to the clients, which puts very little load
on the application server.
The transform mechanism requires intercepting, decoding, mutating and
re-encoding every message from the backend server before they are sent to the
client. This could be costly for busy shapes or lots of connected clients.
Limitations
See the documentation of Phoenix.Sync.Router.sync/3 for additional
constraints on transform functions when defined within a route.
Options
When defining a shape via a keyword list, it supports the following options:
- :table(- String.t/0) - Required.
- :where- Filter the table according to the where clause. The default value is- nil.
- :columns- List of columns to include in the shape. Must include all primary keys. If- nilthis is equivalent to all columns (- SELECT *) The default value is- nil.
- :namespace- The namespace the table belongs to. If- nilthen Postgres will use whatever schema is the default (usually- public). The default value is- nil.
- :params- Values of positional parameters in the where clause. These will substitute- $iplaceholder in the where clause. The default value is- nil.
- :replica- Modifies the data sent in update and delete change messages.- When set to - :fullthe entire row will be sent for updates and deletes, not just the changed columns.- The default value is - :default.
- :transform(- (map() -> [map()]) | mfa() | module()) - A transform function to apply to each row.- This can be either a MFA tuple ( - {MyModule, :my_fun, [arg1, ...]}), a- &MyModule.fun/1capture or an- Ecto.Schemamodule (depending on use).- See the documentation of - Phoenix.Sync.shape!/2for more details.