Phoenix.Sync.Writer (Phoenix.Sync v0.4.4)
View SourceProvides write-path sync support for Phoenix- or Plug-based apps.
Imagine you're building an application on sync. You've used the read-path sync utilities to sync data into the front-end. If the client then changes the data locally, these writes can be batched up and sent back to the server.
Phoenix.Sync.Writer
provides a principled way of ingesting these local writes
and applying them to Postgres. In a way that works-with and re-uses your existing
authorization logic and your existing Ecto.Schema
s and Ecto.Changeset
validation
functions.
This allows you to build instant, offline-capable applications that work with local optimistic state.
Controller example
For example, take a project management app that's using
@TanStack/db to batch up local
optimistic writes and POST them to the Phoenix.Controller
below:
defmodule MutationController do
use Phoenix.Controller, formats: [:json]
alias Phoenix.Sync.Writer
alias Phoenix.Sync.Writer.Format
def mutate(conn, %{"transaction" => transaction} = _params) do
user_id = conn.assigns.user_id
{:ok, txid, _changes} =
Writer.new()
|> Writer.allow(
Projects.Project,
check: reject_invalid_params/1,
load: &Projects.load_for_user(&1, user_id),
validate: &Projects.Project.changeset/2
)
|> Writer.allow(
Projects.Issue,
# Use the sensible defaults:
# validate: Projects.Issue.changeset/2
# etc.
)
|> Writer.apply(
transaction,
Repo,
format: Format.TanstackDB
)
json(conn, %{txid: txid})
end
end
The controller constructs a Phoenix.Sync.Writer
instance and pipes
it through a series of allow/3
calls, registering functions against
Ecto.Schema
s (in this case Projects.Project
and Projects.Issue
) to
validate and authorize each of these mutation operations before applying
them as a single transaction.
This controller can become a single, unified entry point for ingesting writes
into your application. You can extend the pipeline with allow/3
calls for
every schema that you'd like to be able to ingest changes to.
The check
, load
, validate
, etc. callbacks to the allow
function are designed to allow you to re-use your authorization and validation
logic from your existing Plug
middleware and Ecto.Changeset
functions.
Warning
The mutation operations received from clients MUST be considered as untrusted.
Though the HTTP operation that uploaded them will have been authenticated and authorized by your existing Plug middleware as usual, the actual content of the request that is turned into writes against your database needs to be validated very carefully against the privileges of the current user.
That's what Phoenix.Sync.Writer
is for: specifying which resources can be
updated and registering functions to authorize and validate the mutation payload.
Usage levels (high, mid, low)
You don't need to use Phoenix.Sync.Writer
to ingest write operations using Phoenix.
Phoenix already ships with primitives like Ecto.Multi
and Ecto.Repo.transaction/2
.
However, Phoenix.Sync.Writer
provides:
- a number of convienience functions that simplify ingesting mutation operations
- a high-level pipeline that dries up a lot of common boilerplate and allows you to re-use
your existing
Plug
andEcto.Changeset
logic
High-level usage
The controller example above uses a higher level pipeline that dries up common
boilerplate, whilst still allowing flexibility and extensibility. You create an
ingest pipeline by instantiating a Phoenix.Sync.Writer
instance and piping into
allow/3
and apply/4
calls:
{:ok, txid, _changes} =
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(MyApp.Todo)
|> Phoenix.Sync.Writer.allow(MyApp.OtherSchema)
|> Phoenix.Sync.Writer.apply(transaction, Repo, format: MyApp.MutationFormat)
Or, instead of apply/4
you can use seperate calls to ingest/3
and then transaction/2
.
This allows you to ingest multiple formats, for example:
{:ok, txid} =
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(MyApp.Todo)
|> Phoenix.Sync.Writer.ingest(changes, format: MyApp.MutationFormat)
|> Phoenix.Sync.Writer.ingest(other_changes, parser: &MyApp.MutationFormat.parse_other/1)
|> Phoenix.Sync.Writer.ingest(more_changes, parser: {MyApp.MutationFormat, :parse_more, []})
|> Phoenix.Sync.Writer.transaction(MyApp.Repo)
And at any point you can drop down / eject out to the underlying Ecto.Multi
using
to_multi/1
or to_multi/3
:
multi =
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(MyApp.Todo)
|> Phoenix.Sync.Writer.to_multi(changes, format: MyApp.MutationFormat)
# ... do anything you like with the multi ...
{:ok, changes} = Repo.transaction(multi)
{:ok, txid} = Phoenix.Sync.Writer.txid(changes)
Mid-level usage
The pattern above uses a lower-level transact/4
function.
This abstracts the mechanical details of transaction management whilst
still allowing you to handle and apply mutation operations yourself:
{:ok, txid} =
Phoenix.Sync.Writer.transact(
my_encoded_txn,
MyApp.Repo,
fn
%{operation: :insert, relation: [_, "todos"], change: change} ->
MyApp.Repo.insert(...)
%{operation: :update, relation: [_, "todos"], data: data, change: change} ->
MyApp.Repo.update(Ecto.Changeset.cast(...))
%{operation: :delete, relation: [_, "todos"], data: data} ->
# we don't allow deletes...
{:error, "invalid delete"}
end,
format: Phoenix.Sync.Writer.Format.TanstackDB,
timeout: 60_000
)
However, with larger applications, this flexibility can become tiresome as you end up repeating boilerplate and defining your own pipeline to authorize, validate and apply changes with the right error handling and return values.
Low-level usage (DIY)
For the more advanced cases, if you're comfortable parsing, validating and persisting
changes yourself then the simplest way to use Phoenix.Sync.Writer
is to use txid!/1
within Ecto.Repo.transaction/2
:
{:ok, txid} =
MyApp.Repo.transaction(fn ->
# ... save your changes to the database ...
# Return the transaction id.
Phoenix.Sync.Writer.txid!(MyApp.Repo)
end)
This returns the database transaction ID that the changes were applied within. This allows you to return it to the client, which can then monitor the read-path sync stream to detect when the transaction syncs through. At which point the client can discard its local optimistic state.
A convinient way of doing this is to parse the request data into a list of
Phoenix.Sync.Writer.Operation
s using a Phoenix.Sync.Writer.Format
.
You can then apply the changes yourself by matching on the operation data:
{:ok, %Transaction{operations: operations}} =
Phoenix.Sync.Writer.parse_transaction(
my_encoded_txn,
format: Phoenix.Sync.Writer.Format.TanstackDB
)
{:ok, txid} =
MyApp.Repo.transaction(fn ->
Enum.each(txn.operations, fn
%{operation: :insert, relation: [_, "todos"], change: change} ->
# insert a Todo
%{operation: :update, relation: [_, "todos"], data: data, change: change} ->
# update a Todo
%{operation: :delete, relation: [_, "todos"], data: data} ->
# for example, if you don't want to allow deletes...
raise "invalid delete"
end)
Phoenix.Sync.Writer.txid!(MyApp.Repo)
end, timeout: 60_000)
Transactions
The txid
in the return value from apply/4
and txid/1
/ txid!/1
allows the
Postgres transaction ID to be returned to the client in the response data.
This allows clients to monitor the read-path sync stream and match on the arrival of the same transaction id. When the client receives this transaction id back through its sync stream, it knows that it can discard the local optimistic state for that transaction. (This is a more robust way of managing optimistic state than just matching on instance IDs, as it allows for local changes to be rebased on concurrent changes to the same date from other users).
Phoenix.Sync.Writer
uses Ecto.Multi
's transaction update mechanism
under the hood, which means that either all the operations in a client
transaction are accepted or none are. See to_multi/1
for how you can hook
into the Ecto.Multi
after applying your change data.
Compatibility
Phoenix.Sync.Writer
can only return transaction ids when connecting to
a Postgres database (a repo with adapter: Ecto.Adapters.Postgres
). You can
use this module for other databases, but the returned txid will be nil
.
Client Libraries
Phoenix.Sync.Writer
is not coupled to any particular client-side implementation.
See Electric's write pattern guides and example code
for implementation strategies and examples.
Instead, Phoenix.Sync.Writer
provides an adapter pattern where you can register
a format
adapter or parser
function to parse the expected payload format from a client side library
into the struct that Phoenix.Sync.Writer
expects.
The currently supported format adapters are:
TanStack/db "A reactive client store for building super fast apps on sync"
Integration:
Phoenix.Sync.Writer.new() |> Phoenix.Sync.Writer.ingest(mutation_data, format: Phoenix.Sync.Writer.Format.TanstackDB) |> Phoenix.Sync.Writer.transaction(Repo)
Usage
Much as every controller action must be authenticated, authorized and validated to prevent users writing invalid data or data that they do not have permission to modify, mutations MUST be validated for both correctness (are the given values valid?) and permissions (is the current user allowed to apply the given mutation?).
This dual verification -- of data and permissions -- is performed by a pipeline of application-defined callbacks for every model that you allow writes to:
check
- a function that performs a "pre-flight" sanity check of the user-provided data in the mutation; this should just validate the data and not usually hit the database; checks are performed on all operations in a transaction before proceeding to the next steps in the pipeline; this allows for fast rejection of invalid data before performing more expensive operationsload
- a function that takes the original data and returns the existing model from the database, if it exists, for an update or delete operationvalidate
- create and validate anEcto.Changeset
from the source data and mutation changes; this is intended to be compatible with using existing schema changeset functions; note that, as per any changeset function, the validate function can perform both authorization and validationpre_apply
andpost_apply
- add arbitraryEcto.Multi
operations to the transaction based on the current operation
See apply/4
and the Callbacks for how the transaction is
processed internally and how best to use these callback functions to express your
app's authorization and validation requirements.
Calling new/0
creates an empty writer configuration with the given mutation
parser. But this alone does not permit any mutations. In order to allow writes
from clients you must call allow/3
with a schema module and some callback functions.
# create an empty writer configuration
writer = Phoenix.Sync.Writer.new()
# allow writes to the `Todos.Todo` table
# using `Todos.check_mutation/1` to validate mutation data before
# touching the database
writer = Phoenix.Sync.Writer.allow(writer, Todos.Todo, check: &Todos.check_mutation/1)
If the table name on the client differs from the Postgres table, then you can
add a table
option that specifies the client table name that this allow/3
call applies to:
# `client_todos` is the name of the `todos` table on the clients
writer =
Phoenix.Sync.Writer.allow(
writer,
Todos.Todo,
validate: &Todos.validate_mutation/2,
table: "client_todos"
)
Callbacks
Check
The check
option should be a 1-arity function whose purpose is to test
the mutation data against the authorization rules for the application and
model before attempting any database access.
If the changes are valid then it should return :ok
or {:error, reason}
if
they're invalid.
If any of the changes fail the auth test, then the entire transaction will be rejected.
This is the first line of defence against malicious writes as it provides a quick check of the data from the clients before any reads or writes to the database.
Note that the writer pipeline checks all the operations before proceeding to load, validate and apply each operation in turn.
def check(%Phoenix.Sync.Writer.Operation{} = operation) do
# :ok or {:error, "..."}
end
Load
The load
callback takes the data
in update
or delete
mutations (i.e.:
the original data before changes), and uses it to retrieve the original
Ecto.Struct
model from the database.
It can be a 1- or 2-arity function. The 1-arity version receives just the
data
parameters. The 2-arity version receives the Ecto.Repo
that the
transaction is being applied to and the data
parameters.
# 1-arity version
def load(%{"column" => "value"} = data) do
# Repo.get(...)
end
# 2-arity version
def load(repo, %{"column" => "value"} = data) do
# repo.get(...)
end
If not provided defaults to using Ecto.Repo.get_by/3
using the primary
key(s) defined on the model.
For insert
operations this load function is not used. Instead, the original
struct is created by calling the __struct__/0
function on the Ecto.Schema
module.
Validate
The validate
callback performs the usual role of a changeset function: to
validate the changes against the model's data constraints using the functions
in Ecto.Changeset
.
It should return an Ecto.Changeset
instance (or possibly the original
schema struct in the case of delete
s). If any of the transaction's
changeset's are marked as invalid, then the entire transaction is aborted.
If not specified, the validate
function is defaulted to the schema model's
standard changeset/2
function if available.
The callback can be either a 2- or 3-arity function.
The 2-arity version will receive the Ecto.Schema
struct returned from the
load
function and the mutation changes. The 3-arity version will receive
the load
ed struct, the changes and the operation.
# 2-arity version
def changeset(%Todo{} = data, %{} = changes) do
data
|> Ecto.Changeset.cast(changes, [:title, :completed])
|> Ecto.Changeset.validate_required(changes, [:title])
end
# 3-arity version
def changeset(%Todo{} = data, %{} = changes, operation)
when operation in [:insert, :update, :delete] do
# ...
end
Primary keys
Whether the params for insert operations contains a value for the new primary
key is application specific. It's certainly not required if you have declared your
Ecto.Schema
model with a primary key set to autogenerate: true
.
It's worth noting that if you are accepting primary key values as part of
your insert
changes, then you should use UUID primary keys for your models
to prevent conflicts.
pre_apply and post_apply
These callbacks, run before or after the actual insert
, update
or
delete
operation allow for the addition of side effects to the transaction.
They are passed an empty Ecto.Multi
struct and which is then
merged into the writer's transaction.
They also allow for more validation/authorization steps as any operation within the callback that returns an "invalid" operation will abort the entire transaction.
def pre_or_post_apply(%Ecto.Multi{} = multi, %Ecto.Changeset{} = change, %Phoenix.Sync.Writer.Context{} = context) do
multi
# add some side-effects
# |> Ecto.Multi.run(Phoenix.Sync.Writer.operation_name(context, :image), fn _changes ->
# with :ok <- File.write(image.name, image.contents) do
# {:ok, nil}
# end
# end)
#
# validate the current transaction and abort using an {:error, value} tuple
# |> Ecto.Multi.run(Phoenix.Sync.Writer.operation_name(context, :my_validation), fn _changes ->
# {:error, "reject entire transaction"}
# end)
end
Note the use of operation_name/2
when adding operations. Every name in the
final Ecto.Multi
struct must be unique, operation_name/2
generates names
that are guaranteed to be unique to the current operation and callback.
Per-operation callbacks
If you want to re-use an existing function on a per-operation basis, then in your write configuration you can define both top-level and per operation callbacks:
Phoenix.Sync.Writer.allow(
Todos.Todo,
load: &Todos.fetch_for_user(&1, user_id),
check: &Todos.check_mutation(&1, user_id),
validate: &Todos.Todo.changeset/2,
update: [
# for inserts and deletes, use &Todos.Todo.changeset/2 but for updates
# use this function
validate: &Todos.Todo.update_changeset/2,
pre_apply: &Todos.pre_apply_update_todo/3
],
insert: [
# optional validate, pre_apply and post_apply
# overrides for insert operations
],
delete: [
# optional validate, pre_apply and post_apply
# overrides for delete operations
],
)
End-to-end usage
The combination of the check
, load
, validate
, pre_apply
and
post_apply
functions can be composed to provide strong guarantees of
validity.
The aim is to provide an interface as similar to that used in controller functions as possible.
Here we show an example controller module that allows updating of Todo
s via
a standard HTTP PUT
update handler and also via HTTP POST
s to the
mutation
handler which applies optimistic writes via this module.
We use the load
function to validate the ownership of the original Todo
by looking up the data using both the id
and the user_id
. This makes it
impossible for user a
to update todos belonging to user b
.
defmodule MyController do
use Phoenix.Controller, formats: [:html, :json]
alias Phoenix.Sync.Writer
# The standard HTTP PUT update handler
def update(conn, %{"todo" => todo_params}) do
user_id = conn.assigns.user_id
with {:ok, todo} <- fetch_for_user(params, user_id),
{:ok, params} <- validate_params(todo, todo_params, user_id),
{:ok, updated_todo} <- Todos.update(todo, params) do
redirect(conn, to: ~p"/todos/#{updated_todo.id}")
end
end
# The HTTP POST mutations handler which receives JSON data
def mutations(conn, %{"transaction" => transaction} = _params) do
user_id = conn.assigns.user_id
{:ok, txid, _changes} =
Writer.new()
|> Writer.allow(
Todos.Todo,
check: &validate_mutation(&1, user_id),
load: &fetch_for_user(&1, user_id),
)
|> Writer.apply(transaction, Repo, format: Writer.Format.TanstackDB)
json(conn, %{txid: txid})
end
# Included here for completeness but in a real app would be a
# public function in the Todos context.
# Because we're validating the ownership of the Todo here we add an
# extra layer of auth checks, preventing one user from modifying
# the Todos of another.
defp fetch_for_user(%{"id" => id}, user_id) do
from(t in Todos.Todo, where: t.id == ^id and t.user_id == ^user_id)
|> Repo.one()
end
defp validate_mutation(%Writer.Operation{} = op, user_id) do
with :ok <- validate_params(op.data, user_id) do
validate_params(op.changes, user_id)
end
end
defp validate_params(%{"user_id" => user_id}, user_id), do: :ok
defp validate_params(%{} = _params, _user_id), do: {:error, "invalid user_id"}
end
Because Phoenix.Sync.Write
leverages Ecto.Multi
to do the work of
applying changes and managing errors, you're also free to extend the actions
that are performed with every transaction using pre_apply
and post_apply
callbacks configured per-table or per-table per-action (insert, update,
delete). See allow/3
for more information on the configuration options
for each table.
The result of to_multi/1
or to_multi/3
is an Ecto.Multi
instance so you can also just
append operations using the normal Ecto.Multi
functions:
{:ok, txid, _changes} =
Writer.new()
|> Writer.allow(Todo, ...)
|> Writer.to_multi(transaction, parser: &my_transaction_parser/1)
|> Ecto.Multi.insert(:my_action, %Event{})
|> Writer.transaction(Repo)
Summary
Functions
Allow writes to the given Ecto.Schema
.
Ingest and write changes to the given repo in a single call.
Add the given changes to the operations that will be applied within a transaction/3
.
Create a new empty writer.
Return a unique operation name for use in pre_apply
or post_apply
callbacks.
Like operation_name/1
but allows for a custom label.
Use the parser configured in the given Writer
instance to decode the given transaction data.
Given a writer configuration created using allow/3
translate the list of
mutations into an Ecto.Multi
operation.
Ingest changes and map them into an Ecto.Multi
instance ready to apply
using Phoenix.Sync.Writer.transaction/3
or Ecto.Repo.transaction/2
.
Apply operations from a mutation directly via a transaction.
Runs the mutation inside a transaction.
Extract the transaction id from changes or from a Ecto.Repo
within a
transaction.
Returns the a transaction id or raises on an error.
Types
@type allow_opts() :: [ table: String.t() | [String.t(), ...], accept: [operation(), ...], check: (Phoenix.Sync.Writer.Operation.t() -> :ok | {:error, term()}), before_all: (Ecto.Multi.t() -> Ecto.Multi.t()), load: (Ecto.Repo.t(), data() -> Ecto.Schema.t() | {:ok, Ecto.Schema.t()} | nil | {:error, String.t()}) | (data() -> Ecto.Schema.t() | {:ok, Ecto.Schema.t()} | nil | {:error, String.t()}), validate: (Ecto.Schema.t(), data() -> Ecto.Changeset.t()) | (Ecto.Schema.t(), data(), operation() -> Ecto.Changeset.t()), pre_apply: pre_post_func(), post_apply: pre_post_func(), insert: operation_opts(), update: operation_opts(), delete: operation_opts() ]
@type context() :: %Phoenix.Sync.Writer.Context{ action: term(), callback: :load | :validate | :pre_apply | :post_apply, changes: Ecto.Mult.changes(), index: non_neg_integer(), operation: :insert | :update | :delete, pk: term(), schema: Ecto.Schema.t(), writer: term() }
@type ingest_change() :: {Phoenix.Sync.Writer.Format.t(), Phoenix.Sync.Writer.Format.parser_fun(), Phoenix.Sync.Writer.Format.transaction_data()}
@type operation() :: :insert | :update | :delete
@type operation_opts() :: [ validate: (Ecto.Schema.t(), data() -> Ecto.Changeset.t()), pre_apply: pre_post_func(), post_apply: pre_post_func() ]
@type parse_opts() :: [ format: Phoenix.Sync.Writer.Format.t(), parser: Phoenix.Sync.Writer.Format.parser_fun() ]
@type pre_post_func() :: (Ecto.Multi.t(), Ecto.Changeset.t(), context() -> Ecto.Multi.t())
@type repo_transaction_opts() :: keyword()
@type t() :: %Phoenix.Sync.Writer{ ingest: [ingest_change()], mappings: %{required(binary() | [binary(), ...]) => schema_config()} }
@type transact_opts() :: [parse_opts() | repo_transaction_opts()]
@type txid() :: Phoenix.Sync.Writer.Transaction.id()
Functions
@spec allow(t(), module(), allow_opts()) :: t()
Allow writes to the given Ecto.Schema
.
Only tables specified in calls to allow/3
will be accepted by later calls
to transaction/3
. Any changes to tables not explicitly defined by allow/3
calls
will be rejected and cause the entire transaction to be rejected.
Examples
# allow writes to the Todo table using
# `MyApp.Todos.Todo.check_mutation/1` to validate operations
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(
MyApp.Todos.Todo,
check: &MyApp.Todos.check_mutation/1
)
# A more complex configuration adding an `post_apply` callback to inserts
# and using a custom query to load the original database value.
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(
MyApp.Todos..Todo,
load: &MyApp.Todos.get_for_mutation/1,
check: &MyApp.Todos.check_mutation/1,
insert: [
post_apply: &MyApp.Todos.post_apply_insert_mutation/3
]
)
Supported options
:table
(String.t() | [String.t(), ...]
) - Override the table name of theEcto.Schema
struct to allow for mapping between table names on the client and within Postgres.If you pass just a table name, then any schema prefix in the client tables is ignored, so
Writer.allow(Todos, table: "todos")
will match client operations for
["public", "todos"]
and["application", "todos"]
etc.If you provide a 2-element list then the mapping will be exact and only client relations matching the full
[schema, table]
pair will match the given schema.Writer.allow(Todos, table: ["public", "todos"])
Will match client operations for
["public", "todos"]
but not["application", "todos"]
etc.Defaults to
Model.__schema__(:source)
, or if the Ecto schema module has specified anamespace
[Model.__schema__(:prefix), Model.__schema__(:source)]
.:accept
- A list of actions to accept.A transaction containing an operation not in the accept list will be rejected.
Defaults to accepting all operations,
[:insert, :update, :delete]
.:check
(function of arity 1) - A function that validates every %Phoenix.Sync.Writer.Operation{} in the transaction for correctness.This is run before any database access is performed and so provides an efficient way to prevent malicious writes without hitting your database.
Defaults to a function that allows all operations:
fn _ -> :ok end
.:before_all
(function of arity 1) - Run only once (per transaction) after the parsing andcheck
callback have completed and beforeload
andvalidate
functions run.Useful for pre-loading data from the database that can be shared across all operation callbacks for all the mutations.
Arguments:
multi
anEcto.Multi
struct
Return value:
Ecto.Multi
struct with associated data
Defaults to no callback.
:load
- A 1- or 2-arity function that accepts either the mutation operation's data or anEcto.Repo
instance and the mutation data and returns the original row from the database.Arguments:
repo
theEcto.Repo
instance passed toapply/4
ortransaction/3
data
the original operation data
Valid return values are:
struct()
- anEcto.Schema
struct, that must match the module passed toallow/3
{:ok, struct()}
- as above but wrapped in an:ok
tuplenil
- if no row matches the search criteria, or{:error, String.t()}
- asnil
but with a custom error string
A return value of
nil
or{:error, reason}
will abort the transaction.This function is only used for updates or deletes. For inserts, the
__struct__/0
function defined byEcto.Schema
is used to create an empty schema struct.Examples
# load from a known Repo load: fn %{"id" => id} -> MyApp.Repo.get(Todos.Todo, id) # load from the repo passed to `Elixir.Phoenix.Sync.Writer.transaction/2` load: fn repo, %{"id" => id} -> repo.get(Todos.Todo, id)
If not provided defaults to
Ecto.Repo.get_by/3
using the table's schema module and its primary keys.:validate
- a 2- or 3-arity function that returns anEcto.Changeset
for a given mutation.Callback params
data
an Ecto.Schema struct matching the one used when callingallow/2
returned from theload
function.changes
a map of changes to apply to thedata
.operation
(for 3-arity callbacks only) the operation action, one of:insert
,:update
or:delete
At absolute minimum, this should call
Ecto.Changeset.cast/3
to validate the proposed data:def my_changeset(data, changes, _operation) do Ecto.Changeset.cast(data, changes, @permitted_columns) end
Defaults to the given model's
changeset/2
function if defined, raises if no changeset function can be found.:pre_apply
(pre_post_func/0
) - an optional callback that allows for the pre-pending of operations to theEcto.Multi
representing a mutation transaction.If should be a 3-arity function.
Arguments
multi
- an empty%Ecto.Multi{}
instance that you should apply your actions tochangeset
- the changeset representing the individual mutation operationcontext
- the current change context
The result should be the
Ecto.Multi
instance which will be merged with the one representing the mutation operation.Because every action in an
Ecto.Multi
must have a unique key, we advise using theoperation_name/2
function to generate a unique operation name based on thecontext
.def pre_apply(multi, changeset, context) do name = Phoenix.Sync.Writer.operation_name(context, :event_insert) Ecto.Multi.insert(multi, name, %Event{todo_id: id}) end
Defaults to no
nil
.:post_apply
(pre_post_func/0
) - an optional callback function that allows for the appending of operations to theEcto.Multi
representing a mutation transaction.See the docs for
:pre_apply
for the function signature and arguments.Defaults to no
nil
.:insert
(operation_opts/0
) - Callbacks for validating and modifyinginsert
operations.Accepts definitions for the
validate
,pre_apply
andpost_apply
functions forinsert
operations that will override the top-level equivalents.See the documentation for
allow/3
.The only difference with these callback functions is that the
action
parameter is redundant and therefore not passed.Defaults to
[]
, using the top-level functions for all operations.:validate
(function of arity 2) - A 2-arity function that returns a changeset for the given mutation data.Arguments:
schema
the originalEcto.Schema
model returned from theload
functionchanges
a map of changes from the mutation operation
Return value:
:pre_apply
(pre_post_func/0
) - An optional callback that allows for the pre-pending of operations to theEcto.Multi
.Arguments and return value as per the global
pre_apply
callback.:post_apply
(pre_post_func/0
) - An optional callback that allows for the appending of operations to theEcto.Multi
.Arguments and return value as per the global
post_apply
callback.
:update
(operation_opts/0
) - Callbacks for validating and modifyingupdate
operations. See the documentation forinsert
.:validate
(function of arity 2) - A 2-arity function that returns a changeset for the given mutation data.Arguments:
schema
the originalEcto.Schema
model returned from theload
functionchanges
a map of changes from the mutation operation
Return value:
:pre_apply
(pre_post_func/0
) - An optional callback that allows for the pre-pending of operations to theEcto.Multi
.Arguments and return value as per the global
pre_apply
callback.:post_apply
(pre_post_func/0
) - An optional callback that allows for the appending of operations to theEcto.Multi
.Arguments and return value as per the global
post_apply
callback.
:delete
(operation_opts/0
) - Callbacks for validating and modifyingdelete
operations. See the documentation forinsert
.:validate
:pre_apply
(pre_post_func/0
) - An optional callback that allows for the pre-pending of operations to theEcto.Multi
.Arguments and return value as per the global
pre_apply
callback.:post_apply
(pre_post_func/0
) - An optional callback that allows for the appending of operations to theEcto.Multi
.Arguments and return value as per the global
post_apply
callback.
@spec apply( t(), Phoenix.Sync.Writer.Format.transaction_data(), Ecto.Repo.t(), transact_opts() ) :: {:ok, txid(), Ecto.Multi.changes()} | Ecto.Multi.failure()
Ingest and write changes to the given repo in a single call.
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.apply(changes, Repo, parser: &MyFormat.parse/1)
is equivalent to:
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.ingest(changes, parser: &MyFormat.parse/1)
|> Phoenix.Sync.Writer.transaction(Repo)
@spec ingest(t(), Phoenix.Sync.Writer.Format.transaction_data(), parse_opts()) :: t()
Add the given changes to the operations that will be applied within a transaction/3
.
Examples:
{:ok, txid} =
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(MyApp.Todo)
|> Phoenix.Sync.Writer.ingest(changes, format: MyApp.MutationFormat)
|> Phoenix.Sync.Writer.ingest(other_changes, parser: &MyApp.MutationFormat.parse_other/1)
|> Phoenix.Sync.Writer.ingest(more_changes, parser: {MyApp.MutationFormat, :parse_more, []})
|> Phoenix.Sync.Writer.transaction(MyApp.Repo)
Supported options:
:format
(Format.t()
) - A module implementing thePhoenix.Sync.Writer.Format
behaviour.:parser
(Phoenix.Sync.Writer.Format.parser_fun() | mfa()
) - A function that parses some input data and returns a%Transaction{}
struct or an error. SeePhoenix.Sync.Writer.Format.parse_transaction/1
.
@spec new() :: t()
Create a new empty writer.
Empty writers will reject writes to any tables. You should configure writes
to the permitted tables by calling allow/3
.
Return a unique operation name for use in pre_apply
or post_apply
callbacks.
Ecto.Multi
requires that all operation names be unique within a
transaction. This function gives you a simple way to generate a name for your
own operations that is guarateed not to conflict with any other.
Example:
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(
MyModel,
pre_apply: fn multi, changeset, context ->
name = Phoenix.Sync.Writer.operation_name(context)
Ecto.Multi.insert(multi, name, AuditEvent.for_changeset(changeset))
end
)
Like operation_name/1
but allows for a custom label.
@spec parse_transaction(Phoenix.Sync.Writer.Format.transaction_data(), parse_opts()) :: {:ok, Phoenix.Sync.Writer.Transaction.t()} | {:error, term()}
Use the parser configured in the given Writer
instance to decode the given transaction data.
This can be used to handle mutation operations explicitly:
{:ok, txn} = Phoenix.Sync.Writer.parse_transaction(my_json_tx_data, format: Phoenix.Sync.Writer.Format.TanstackDB)
{:ok, txid} =
Repo.transaction(fn ->
Enum.each(txn.operations, fn operation ->
# do something wih the given operation
# raise if something is wrong...
end)
# return the transaction id
Phoenix.Sync.Writer.txid!(Repo)
end)
@spec to_multi(t()) :: Ecto.Multi.t()
Given a writer configuration created using allow/3
translate the list of
mutations into an Ecto.Multi
operation.
Example:
%Ecto.Multi{} = multi =
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(MyApp.Todos.Todo, check: &my_check_function/1)
|> Phoenix.Sync.Writer.allow(MyApp.Options.Option, check: &my_check_function/1)
|> Phoenix.Sync.Writer.ingest(changes, format: Phoenix.Sync.Writer.Format.TanstackDB)
|> Phoenix.Sync.Writer.to_multi()
If you want to add extra operations to the mutation transaction, beyond those
applied by any pre_apply
or post_apply
callbacks in your mutation config then use
the functions in Ecto.Multi
to do those as normal.
Use transaction/3
to apply the changes to the database and return the
transaction id.
to_multi/1
builds an Ecto.Multi
struct containing the operations required to
write the mutation operations to the database.
The order of operation is:
1. Parse
The transaction data is parsed, using either the format
or the parser
function
supplied in ingest/3
.
2. Check
The user input data in each operation in the transaction is tested for validity
via the check
function.
At this point no database operations have taken place. Errors at the parse or
check
stage result in an early exit. The purpose of the check
callback is
sanity check the incoming mutation data against basic sanitization rules, much
as you would do with Plug
middleware and controller params pattern matching.
Now that we have a list of validated mutation operations, the next step is:
3. Before-all
Perform any actions defined in the before_all
callback.
This only happens once per transaction, the first time the model owning the callback is included in the operation list.
The following actions happen once per operation in the transaction:
4. Load
The load
function is called to retrieve the source row from the database
(for update
and delete
operations), or the schema's __struct__/0
function is called to instantiate an empty struct (insert
).
5. Validate
The validate
function is called with the result of the load
function
and the operation's changes.
6. Pre-apply
The pre_apply
callback is called with a multi
instance, the result of the
validate
function and the current Context
. The result is
merged into the transaction's ongoing Ecto.Multi
.
7. Apply
The actual operation is applied to the database using one of
Ecto.Multi.insert/4
, Ecto.Multi.update/4
or Ecto.Multi.delete/4
, and
8. Post-apply
Finally the post_apply
callback is called.
Any error in any of these stages will abort the entire transaction and leave your database untouched.
@spec to_multi(t(), Phoenix.Sync.Writer.Format.transaction_data(), parse_opts()) :: Ecto.Multi.t()
Ingest changes and map them into an Ecto.Multi
instance ready to apply
using Phoenix.Sync.Writer.transaction/3
or Ecto.Repo.transaction/2
.
This is a wrapper around ingest/3
and to_multi/1
.
Example:
%Ecto.Multi{} = multi =
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(MyApp.Todos.Todo, check: &my_check_function/1)
|> Phoenix.Sync.Writer.allow(MyApp.Options.Option, check: &my_check_function/1)
|> Phoenix.Sync.Writer.to_multi(changes, format: Phoenix.Sync.Writer.Format.TanstackDB)
@spec transact( Phoenix.Sync.Writer.Format.transaction_data(), Ecto.Repo.t(), operation_fun :: (Phoenix.Sync.Writer.Operation.t() -> :ok | {:ok, any()} | {:error, any()}), transact_opts() ) :: {:ok, txid()} | {:error, any()}
Apply operations from a mutation directly via a transaction.
operation_fun
is a 1-arity function that receives each of the
%Phoenix.Sync.Writer.Operation{}
structs within the mutation data and
should apply them appropriately. It should return :ok
or {:ok, result}
if
successful or {:error, reason}
if the operation is invalid or failed to
apply. If any operation returns {:error, _}
or raises then the entire
transaction is aborted.
This function will return {:error, reason}
if the transaction data fails to parse.
{:ok, txid} =
Phoenix.Sync.Writer.transact(
my_encoded_txn,
MyApp.Repo,
fn
%{operation: :insert, relation: [_, "todos"], change: change} ->
MyApp.Repo.insert(...)
%{operation: :update, relation: [_, "todos"], data: data, change: change} ->
MyApp.Repo.update(Ecto.Changeset.cast(...))
%{operation: :delete, relation: [_, "todos"], data: data} ->
# we don't allow deletes...
{:error, "invalid delete"}
end,
format: Phoenix.Sync.Writer.Format.TanstackDB,
timeout: 60_000
)
Any of the opts
not used by this module are passed onto the
Ecto.Repo.transaction/2
call.
This is equivalent to the below:
{:ok, txn} =
Phoenix.Sync.Writer.parse_transaction(
my_encoded_txn,
format: Phoenix.Sync.Writer.Format.TanstackDB
)
{:ok, txid} =
MyApp.Repo.transaction(fn ->
Enum.each(txn.operations, fn
%{operation: :insert, relation: [_, "todos"], change: change} ->
# insert a Todo
%{operation: :update, relation: [_, "todos"], data: data, change: change} ->
# update a Todo
%{operation: :delete, relation: [_, "todos"], data: data} ->
# we don't allow deletes...
raise "invalid delete"
end)
Phoenix.Sync.Writer.txid!(MyApp.Repo)
end, timeout: 60_000)
@spec transaction(t() | Ecto.Multi.t(), Ecto.Repo.t(), keyword()) :: {:ok, txid(), Ecto.Multi.changes()} | Ecto.Multi.failure()
Runs the mutation inside a transaction.
Since the mutation operation is expressed as an Ecto.Multi
operation, see
the Ecto.Repo
docs
for the result if any of your mutations returns an error.
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(MyApp.Todos.Todo)
|> Phoenix.Sync.Writer.allow(MyApp.Options.Option)
|> Phoenix.Sync.Writer.ingest(
changes,
format: Phoenix.Sync.Writer.Format.TanstackDB
)
|> Phoenix.Sync.Writer.transaction(MyApp.Repo)
|> case do
{:ok, txid, _changes} ->
# return the txid to the client
Plug.Conn.send_resp(conn, 200, Jason.encode!(%{txid: txid}))
{:error, _failed_operation, failed_value, _changes_so_far} ->
# extract the error message from the changeset returned as `failed_value`
error =
Ecto.Changeset.traverse_errors(failed_value, fn {msg, opts} ->
Regex.replace(~r"%{(w+)}", msg, fn _, key ->
opts |> Keyword.get(String.to_existing_atom(key), key) |> to_string()
end)
end)
Plug.Conn.send_resp(conn, 400, Jason.encode!(error))
end
Also supports normal fun/0 or fun/1 style transactions much like
Ecto.Repo.transaction/2
, returning the txid of the operation:
{:ok, txid, todo} =
Phoenix.Sync.Writer.transaction(fn ->
Repo.insert!(changeset)
end, Repo)
@spec txid(Ecto.Multi.changes()) :: {:ok, txid()} | :error
Extract the transaction id from changes or from a Ecto.Repo
within a
transaction.
This allows you to use a standard Ecto.Repo.transaction/2
call to apply
mutations defined using apply/2
and extract the transaction id afterwards.
Example
{:ok, changes} =
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(MyApp.Todos.Todo)
|> Phoenix.Sync.Writer.allow(MyApp.Options.Option)
|> Phoenix.Sync.Writer.to_multi(changes, format: Phoenix.Sync.Writer.Format.TanstackDB)
|> MyApp.Repo.transaction()
{:ok, txid} = Phoenix.Sync.Writer.txid(changes)
It also allows you to get a transaction id from any active transaction:
MyApp.Repo.transaction(fn ->
{:ok, txid} = Phoenix.Sync.Writer.txid(MyApp.Repo)
end)
Attempting to run txid/1
on a repo outside a transaction will return an
error.
@spec txid!(Ecto.Multi.changes()) :: txid()
Returns the a transaction id or raises on an error.
See txid/1
.