Phoenix.Sync.Writer (Phoenix.Sync v0.6.1)
View SourceProvides write-path sync support for Phoenix- or Plug-based apps.
Imagine you're building an application on sync. You've used the read-path sync utilities to sync data into the front-end. If the client then changes the data locally, these writes can be batched up and sent back to the server.
Phoenix.Sync.Writer provides a principled way of ingesting these local writes
and applying them to Postgres. In a way that works-with and re-uses your existing
authorization logic and your existing Ecto.Schemas and Ecto.Changeset validation
functions.
This allows you to build instant, offline-capable applications that work with local optimistic state.
Requirements
Phoenix.Sync.Writer requires Ecto.
It's perfectly possible to use Phoenix.Sync without any database
connection (syncing via a HTTP) but the write-path requires both a database
and Ecto.
Controller example
For example, take a project management app that's using
@TanStack/db to batch up local
optimistic writes and POST them to the Phoenix.Controller below:
defmodule MutationController do
  use Phoenix.Controller, formats: [:json]
  alias Phoenix.Sync.Writer
  alias Phoenix.Sync.Writer.Format
  def mutate(conn, %{"transaction" => transaction} = _params) do
    user_id = conn.assigns.user_id
    {:ok, txid, _changes} =
      Writer.new()
      |> Writer.allow(
        Projects.Project,
        check: reject_invalid_params/1,
        load: &Projects.load_for_user(&1, user_id),
        validate: &Projects.Project.changeset/2
      )
      |> Writer.allow(
        Projects.Issue,
        # Use the sensible defaults:
        # validate: Projects.Issue.changeset/2
        # etc.
      )
      |> Writer.apply(
        transaction,
        Repo,
        format: Format.TanstackDB
      )
    json(conn, %{txid: txid})
  end
endThe controller constructs a Phoenix.Sync.Writer instance and pipes
it through a series of allow/3 calls, registering functions against
Ecto.Schemas (in this case Projects.Project and Projects.Issue) to
validate and authorize each of these mutation operations before applying
them as a single transaction.
This controller can become a single, unified entry point for ingesting writes
into your application. You can extend the pipeline with allow/3 calls for
every schema that you'd like to be able to ingest changes to.
The check, load, validate, etc. callbacks to the allow
function are designed to allow you to re-use your authorization and validation
logic from your existing Plug middleware and Ecto.Changeset functions.
Warning
The mutation operations received from clients MUST be considered as untrusted.
Though the HTTP operation that uploaded them will have been authenticated and authorized by your existing Plug middleware as usual, the actual content of the request that is turned into writes against your database needs to be validated very carefully against the privileges of the current user.
That's what Phoenix.Sync.Writer is for: specifying which resources can be
updated and registering functions to authorize and validate the mutation payload.
Usage levels (high, mid, low)
You don't need to use Phoenix.Sync.Writer to ingest write operations using Phoenix.
Phoenix already ships with primitives like Ecto.Multi and Ecto.Repo.transaction/2.
However, Phoenix.Sync.Writer provides:
- a number of convenience functions that simplify ingesting mutation operations
- a high-level pipeline that dries up a lot of common boilerplate and allows you to re-use
your existing PlugandEcto.Changesetlogic
High-level usage
The controller example above uses a higher level pipeline that dries up common
boilerplate, whilst still allowing flexibility and extensibility. You create an
ingest pipeline by instantiating a Phoenix.Sync.Writer instance and piping into
allow/3 and apply/4 calls:
{:ok, txid, _changes} =
  Phoenix.Sync.Writer.new()
  |> Phoenix.Sync.Writer.allow(MyApp.Todo)
  |> Phoenix.Sync.Writer.allow(MyApp.OtherSchema)
  |> Phoenix.Sync.Writer.apply(transaction, Repo, format: MyApp.MutationFormat)Or, instead of apply/4 you can use separate calls to ingest/3 and then transaction/2.
This allows you to ingest multiple formats, for example:
{:ok, txid} =
  Phoenix.Sync.Writer.new()
  |> Phoenix.Sync.Writer.allow(MyApp.Todo)
  |> Phoenix.Sync.Writer.ingest(changes, format: MyApp.MutationFormat)
  |> Phoenix.Sync.Writer.ingest(other_changes, parser: &MyApp.MutationFormat.parse_other/1)
  |> Phoenix.Sync.Writer.ingest(more_changes, parser: {MyApp.MutationFormat, :parse_more, []})
  |> Phoenix.Sync.Writer.transaction(MyApp.Repo)And at any point you can drop down / eject out to the underlying Ecto.Multi using
to_multi/1 or to_multi/3:
multi =
  Phoenix.Sync.Writer.new()
  |> Phoenix.Sync.Writer.allow(MyApp.Todo)
  |> Phoenix.Sync.Writer.to_multi(changes, format: MyApp.MutationFormat)
# ... do anything you like with the multi ...
{:ok, changes} = Repo.transaction(multi)
{:ok, txid} = Phoenix.Sync.Writer.txid(changes)Mid-level usage
The pattern above uses a lower-level transact/4 function.
This abstracts the mechanical details of transaction management whilst
still allowing you to handle and apply mutation operations yourself:
{:ok, txid} =
  Phoenix.Sync.Writer.transact(
    my_encoded_txn,
    MyApp.Repo,
    fn
      %{operation: :insert, relation: [_, "todos"], change: change} ->
        MyApp.Repo.insert(...)
      %{operation: :update, relation: [_, "todos"], data: data, change: change} ->
        MyApp.Repo.update(Ecto.Changeset.cast(...))
      %{operation: :delete, relation: [_, "todos"], data: data} ->
        # we don't allow deletes...
        {:error, "invalid delete"}
    end,
    format: Phoenix.Sync.Writer.Format.TanstackDB,
    timeout: 60_000
  )However, with larger applications, this flexibility can become tiresome as you end up repeating boilerplate and defining your own pipeline to authorize, validate and apply changes with the right error handling and return values.
Low-level usage (DIY)
For the more advanced cases, if you're comfortable parsing, validating and persisting
changes yourself then the simplest way to use Phoenix.Sync.Writer is to use txid!/1
within Ecto.Repo.transaction/2:
{:ok, txid} =
  MyApp.Repo.transaction(fn ->
    # ... save your changes to the database ...
    # Return the transaction id.
    Phoenix.Sync.Writer.txid!(MyApp.Repo)
  end)This returns the database transaction ID that the changes were applied within. This allows you to return it to the client, which can then monitor the read-path sync stream to detect when the transaction syncs through. At which point the client can discard its local optimistic state.
A convenient way of doing this is to parse the request data into a list of
Phoenix.Sync.Writer.Operations using a Phoenix.Sync.Writer.Format.
You can then apply the changes yourself by matching on the operation data:
{:ok, %Transaction{operations: operations}} =
  Phoenix.Sync.Writer.parse_transaction(
    my_encoded_txn,
    format: Phoenix.Sync.Writer.Format.TanstackDB
  )
{:ok, txid} =
  MyApp.Repo.transaction(fn ->
    Enum.each(txn.operations, fn
      %{operation: :insert, relation: [_, "todos"], change: change} ->
        # insert a Todo
      %{operation: :update, relation: [_, "todos"], data: data, change: change} ->
        # update a Todo
      %{operation: :delete, relation: [_, "todos"], data: data} ->
        # for example, if you don't want to allow deletes...
        raise "invalid delete"
    end)
    Phoenix.Sync.Writer.txid!(MyApp.Repo)
  end, timeout: 60_000)Transactions
The txid in the return value from apply/4 and txid/1 / txid!/1 allows the
Postgres transaction ID to be returned to the client in the response data.
This allows clients to monitor the read-path sync stream and match on the arrival of the same transaction id. When the client receives this transaction id back through its sync stream, it knows that it can discard the local optimistic state for that transaction. (This is a more robust way of managing optimistic state than just matching on instance IDs, as it allows for local changes to be rebased on concurrent changes to the same date from other users).
Phoenix.Sync.Writer uses Ecto.Multi's transaction update mechanism
under the hood, which means that either all the operations in a client
transaction are accepted or none are. See to_multi/1 for how you can hook
into the Ecto.Multi after applying your change data.
Compatibility
Phoenix.Sync.Writer can only return transaction ids when connecting to
a Postgres database (a repo with adapter: Ecto.Adapters.Postgres). You can
use this module for other databases, but the returned txid will be nil.
Client Libraries
Phoenix.Sync.Writer is not coupled to any particular client-side implementation.
See Electric's write pattern guides and example code
for implementation strategies and examples.
Instead, Phoenix.Sync.Writer provides an adapter pattern where you can register
a format adapter or parser function to parse the expected payload format from a client side library
into the struct that Phoenix.Sync.Writer expects.
The currently supported format adapters are:
- TanStack/db "A reactive client store for building super fast apps on sync" - Integration: - Phoenix.Sync.Writer.new() |> Phoenix.Sync.Writer.ingest(mutation_data, format: Phoenix.Sync.Writer.Format.TanstackDB) |> Phoenix.Sync.Writer.transaction(Repo)
Usage
Much as every controller action must be authenticated, authorized and validated to prevent users writing invalid data or data that they do not have permission to modify, mutations MUST be validated for both correctness (are the given values valid?) and permissions (is the current user allowed to apply the given mutation?).
This dual verification -- of data and permissions -- is performed by a pipeline of application-defined callbacks for every model that you allow writes to:
- check- a function that performs a "pre-flight" sanity check of the user-provided data in the mutation; this should just validate the data and not usually hit the database; checks are performed on all operations in a transaction before proceeding to the next steps in the pipeline; this allows for fast rejection of invalid data before performing more expensive operations
- load- a function that takes the original data and returns the existing model from the database, if it exists, for an update or delete operation
- validate- create and validate an- Ecto.Changesetfrom the source data and mutation changes; this is intended to be compatible with using existing schema changeset functions; note that, as per any changeset function, the validate function can perform both authorization and validation
- pre_applyand- post_apply- add arbitrary- Ecto.Multioperations to the transaction based on the current operation
See apply/4 and the Callbacks for how the transaction is
processed internally and how best to use these callback functions to express your
app's authorization and validation requirements.
Calling new/0 creates an empty writer configuration with the given mutation
parser. But this alone does not permit any mutations. In order to allow writes
from clients you must call allow/3 with a schema module and some callback functions.
# create an empty writer configuration
writer = Phoenix.Sync.Writer.new()
# allow writes to the `Todos.Todo` table
# using `Todos.check_mutation/1` to validate mutation data before
# touching the database
writer = Phoenix.Sync.Writer.allow(writer, Todos.Todo, check: &Todos.check_mutation/1)If the table name on the client differs from the Postgres table, then you can
add a table option that specifies the client table name that this allow/3
call applies to:
# `client_todos` is the name of the `todos` table on the clients
writer =
  Phoenix.Sync.Writer.allow(
    writer,
    Todos.Todo,
    validate: &Todos.validate_mutation/2,
    table: "client_todos"
  )Callbacks
Check
The check option should be a 1-arity function whose purpose is to test
the mutation data against the authorization rules for the application and
model before attempting any database access.
If the changes are valid then it should return :ok or {:error, reason} if
they're invalid.
If any of the changes fail the auth test, then the entire transaction will be rejected.
This is the first line of defence against malicious writes as it provides a quick check of the data from the clients before any reads or writes to the database.
Note that the writer pipeline checks all the operations before proceeding to load, validate and apply each operation in turn.
def check(%Phoenix.Sync.Writer.Operation{} = operation) do
  # :ok or {:error, "..."}
endLoad
The load callback takes the data in update or delete mutations (i.e.:
the original data before changes), and uses it to retrieve the original
Ecto.Struct model from the database.
It can be a 1- or 2-arity function. The 1-arity version receives just the
data parameters. The 2-arity version receives the Ecto.Repo that the
transaction is being applied to and the data parameters.
# 1-arity version
def load(%{"column" => "value"} = data) do
  # Repo.get(...)
end
# 2-arity version
def load(repo, %{"column" => "value"} = data) do
  # repo.get(...)
endIf not provided defaults to using Ecto.Repo.get_by/3 using the primary
key(s) defined on the model.
For insert operations this load function is not used. Instead, the original
struct is created by calling the __struct__/0 function on the Ecto.Schema
module.
Validate
The validate callback performs the usual role of a changeset function: to
validate the changes against the model's data constraints using the functions
in Ecto.Changeset.
It should return an Ecto.Changeset instance (or possibly the original
schema struct in the case of deletes). If any of the transaction's
changeset's are marked as invalid, then the entire transaction is aborted.
If not specified, the validate function is defaulted to the schema model's
standard changeset/2 function if available.
The callback can be either a 2- or 3-arity function.
The 2-arity version will receive the Ecto.Schema struct returned from the
load function and the mutation changes. The 3-arity version will receive
the loaded struct, the changes and the operation.
# 2-arity version
def changeset(%Todo{} = data, %{} = changes) do
  data
  |> Ecto.Changeset.cast(changes, [:title, :completed])
  |> Ecto.Changeset.validate_required(changes, [:title])
end
# 3-arity version
def changeset(%Todo{} = data, %{} = changes, operation)
    when operation in [:insert, :update, :delete] do
  # ...
endPrimary keys
Whether the params for insert operations contains a value for the new primary
key is application specific. It's certainly not required if you have declared your
Ecto.Schema model with a primary key set to autogenerate: true.
It's worth noting that if you are accepting primary key values as part of
your insert changes, then you should use UUID primary keys for your models
to prevent conflicts.
pre_apply and post_apply
These callbacks, run before or after the actual insert, update or
delete operation allow for the addition of side effects to the transaction.
They are passed an empty Ecto.Multi struct and which is then
merged into the writer's transaction.
They also allow for more validation/authorization steps as any operation within the callback that returns an "invalid" operation will abort the entire transaction.
def pre_or_post_apply(%Ecto.Multi{} = multi, %Ecto.Changeset{} = change, %Phoenix.Sync.Writer.Context{} = context) do
  multi
  # add some side-effects
  # |> Ecto.Multi.run(Phoenix.Sync.Writer.operation_name(context, :image), fn _changes ->
  #   with :ok <- File.write(image.name, image.contents) do
  #    {:ok, nil}
  #   end
  # end)
  #
  # validate the current transaction and abort using an {:error, value} tuple
  # |> Ecto.Multi.run(Phoenix.Sync.Writer.operation_name(context, :my_validation), fn _changes ->
  #   {:error, "reject entire transaction"}
  # end)
endNote the use of operation_name/2 when adding operations. Every name in the
final Ecto.Multi struct must be unique, operation_name/2 generates names
that are guaranteed to be unique to the current operation and callback.
Per-operation callbacks
If you want to re-use an existing function on a per-operation basis, then in your write configuration you can define both top-level and per operation callbacks:
Phoenix.Sync.Writer.allow(
  Todos.Todo,
  load: &Todos.fetch_for_user(&1, user_id),
  check: &Todos.check_mutation(&1, user_id),
  validate: &Todos.Todo.changeset/2,
  update: [
    # for inserts and deletes, use &Todos.Todo.changeset/2 but for updates
    # use this function
    validate: &Todos.Todo.update_changeset/2,
    pre_apply: &Todos.pre_apply_update_todo/3
  ],
  insert: [
    # optional validate, pre_apply and post_apply
    # overrides for insert operations
  ],
  delete: [
    # optional validate, pre_apply and post_apply
    # overrides for delete operations
  ],
)End-to-end usage
The combination of the check, load, validate, pre_apply and
post_apply functions can be composed to provide strong guarantees of
validity.
The aim is to provide an interface as similar to that used in controller functions as possible.
Here we show an example controller module that allows updating of Todos via
a standard HTTP PUT update handler and also via HTTP POSTs to the
mutation handler which applies optimistic writes via this module.
We use the load function to validate the ownership of the original Todo
by looking up the data using both the id and the user_id. This makes it
impossible for user a to update todos belonging to user b.
defmodule MyController do
  use Phoenix.Controller, formats: [:html, :json]
  alias Phoenix.Sync.Writer
  # The standard HTTP PUT update handler
  def update(conn, %{"todo" => todo_params}) do
    user_id = conn.assigns.user_id
    with {:ok, todo} <- fetch_for_user(params, user_id),
         {:ok, params} <- validate_params(todo, todo_params, user_id),
         {:ok, updated_todo} <- Todos.update(todo, params) do
      redirect(conn, to: ~p"/todos/#{updated_todo.id}")
    end
  end
  # The HTTP POST mutations handler which receives JSON data
  def mutations(conn, %{"transaction" => transaction} = _params) do
    user_id = conn.assigns.user_id
    {:ok, txid, _changes} =
      Writer.new()
      |> Writer.allow(
        Todos.Todo,
        check: &validate_mutation(&1, user_id),
        load: &fetch_for_user(&1, user_id),
      )
      |> Writer.apply(transaction, Repo, format: Writer.Format.TanstackDB)
    json(conn, %{txid: txid})
  end
  # Included here for completeness but in a real app would be a
  # public function in the Todos context.
  # Because we're validating the ownership of the Todo here we add an
  # extra layer of auth checks, preventing one user from modifying
  # the Todos of another.
  defp fetch_for_user(%{"id" => id}, user_id) do
    from(t in Todos.Todo, where: t.id == ^id  and t.user_id == ^user_id)
    |> Repo.one()
  end
  defp validate_mutation(%Writer.Operation{} = op, user_id) do
    with :ok <- validate_params(op.data, user_id) do
      validate_params(op.changes, user_id)
    end
  end
  defp validate_params(%{"user_id" => user_id}, user_id), do: :ok
  defp validate_params(%{} = _params, _user_id), do: {:error, "invalid user_id"}
endBecause Phoenix.Sync.Write leverages Ecto.Multi to do the work of
applying changes and managing errors, you're also free to extend the actions
that are performed with every transaction using pre_apply and post_apply
callbacks configured per-table or per-table per-action (insert, update,
delete). See allow/3 for more information on the configuration options
for each table.
The result of to_multi/1 or to_multi/3 is an Ecto.Multi instance so you can also just
append operations using the normal Ecto.Multi functions:
{:ok, txid, _changes} =
  Writer.new()
  |> Writer.allow(Todo, ...)
  |> Writer.to_multi(transaction, parser: &my_transaction_parser/1)
  |> Ecto.Multi.insert(:my_action, %Event{})
  |> Writer.transaction(Repo)Summary
Functions
Allow writes to the given Ecto.Schema.
Ingest and write changes to the given repo in a single call.
Add the given changes to the operations that will be applied within a transaction/3.
Create a new empty writer.
Return a unique operation name for use in pre_apply or post_apply callbacks.
Like operation_name/1 but allows for a custom label.
Use the parser configured in the given Writer
instance to decode the given transaction data.
Given a writer configuration created using allow/3 translate the list of
mutations into an Ecto.Multi operation.
Ingest changes and map them into an Ecto.Multi instance ready to apply
using Phoenix.Sync.Writer.transaction/3 or Ecto.Repo.transaction/2.
Apply operations from a mutation directly via a transaction.
Runs the mutation inside a transaction.
Extract the transaction id from changes or from a Ecto.Repo within a
transaction.
Returns the a transaction id or raises on an error.
Types
@type allow_opts() :: [ table: String.t() | [String.t(), ...], accept: [operation(), ...], check: (Phoenix.Sync.Writer.Operation.t() -> :ok | {:error, term()}), before_all: (Ecto.Multi.t() -> Ecto.Multi.t()), load: (Ecto.Repo.t(), data() -> Ecto.Schema.t() | {:ok, Ecto.Schema.t()} | nil | {:error, String.t()}) | (data() -> Ecto.Schema.t() | {:ok, Ecto.Schema.t()} | nil | {:error, String.t()}), validate: (Ecto.Schema.t(), data() -> Ecto.Changeset.t()) | (Ecto.Schema.t(), data(), operation() -> Ecto.Changeset.t()), pre_apply: pre_post_func(), post_apply: pre_post_func(), insert: operation_opts(), update: operation_opts(), delete: operation_opts() ]
@type context() :: %Phoenix.Sync.Writer.Context{ action: term(), callback: :load | :validate | :pre_apply | :post_apply, changes: Ecto.Mult.changes(), index: non_neg_integer(), operation: :insert | :update | :delete, pk: term(), schema: Ecto.Schema.t(), writer: term() }
@type ingest_change() :: {Phoenix.Sync.Writer.Format.t(), Phoenix.Sync.Writer.Format.parser_fun(), Phoenix.Sync.Writer.Format.transaction_data()}
@type operation() :: :insert | :update | :delete
      @type operation_opts() :: [ validate: (Ecto.Schema.t(), data() -> Ecto.Changeset.t()), pre_apply: pre_post_func(), post_apply: pre_post_func() ]
@type parse_opts() :: [ format: Phoenix.Sync.Writer.Format.t(), parser: Phoenix.Sync.Writer.Format.parser_fun() ]
@type pre_post_func() :: (Ecto.Multi.t(), Ecto.Changeset.t(), context() -> Ecto.Multi.t())
@type repo_transaction_opts() :: keyword()
@type t() :: %Phoenix.Sync.Writer{ ingest: [ingest_change()], mappings: %{required(binary() | [binary(), ...]) => schema_config()} }
@type transact_opts() :: [parse_opts() | repo_transaction_opts()]
@type txid() :: Phoenix.Sync.Writer.Transaction.id()
Functions
@spec allow(t(), module(), allow_opts()) :: t()
Allow writes to the given Ecto.Schema.
Only tables specified in calls to allow/3 will be accepted by later calls
to transaction/3. Any changes to tables not explicitly defined by allow/3 calls
will be rejected and cause the entire transaction to be rejected.
Examples
# allow writes to the Todo table using
# `MyApp.Todos.Todo.check_mutation/1` to validate operations
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(
  MyApp.Todos.Todo,
  check: &MyApp.Todos.check_mutation/1
)
# A more complex configuration adding an `post_apply` callback to inserts
# and using a custom query to load the original database value.
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(
  MyApp.Todos..Todo,
  load: &MyApp.Todos.get_for_mutation/1,
  check: &MyApp.Todos.check_mutation/1,
  insert: [
    post_apply: &MyApp.Todos.post_apply_insert_mutation/3
  ]
)Supported options
- :table(- String.t() | [String.t(), ...]) - Override the table name of the- Ecto.Schemastruct to allow for mapping between table names on the client and within Postgres.- If you pass just a table name, then any schema prefix in the client tables is ignored, so - Writer.allow(Todos, table: "todos")- will match client operations for - ["public", "todos"]and- ["application", "todos"]etc.- If you provide a 2-element list then the mapping will be exact and only client relations matching the full - [schema, table]pair will match the given schema.- Writer.allow(Todos, table: ["public", "todos"])- Will match client operations for - ["public", "todos"]but not- ["application", "todos"]etc.- Defaults to - Model.__schema__(:source), or if the Ecto schema module has specified a- namespace- [Model.__schema__(:prefix), Model.__schema__(:source)].
- :accept- A list of actions to accept.- A transaction containing an operation not in the accept list will be rejected. - Defaults to accepting all operations, - [:insert, :update, :delete].
- :check(function of arity 1) - A function that validates every %Phoenix.Sync.Writer.Operation{} in the transaction for correctness.- This is run before any database access is performed and so provides an efficient way to prevent malicious writes without hitting your database. - Defaults to a function that allows all operations: - fn _ -> :ok end.
- :before_all(function of arity 1) - Run only once (per transaction) after the parsing and- checkcallback have completed and before- loadand- validatefunctions run.- Useful for pre-loading data from the database that can be shared across all operation callbacks for all the mutations. - Arguments: - multian- Ecto.Multistruct
 - Return value: - Ecto.Multistruct with associated data
 - Defaults to no callback. 
- :load- A 1- or 2-arity function that accepts either the mutation operation's data or an- Ecto.Repoinstance and the mutation data and returns the original row from the database.- Arguments: - repothe- Ecto.Repoinstance passed to- apply/4or- transaction/3
- datathe original operation data
 - Valid return values are: - struct()- an- Ecto.Schemastruct, that must match the module passed to- allow/3
- {:ok, struct()}- as above but wrapped in an- :oktuple
- nil- if no row matches the search criteria, or
- {:error, String.t()}- as- nilbut with a custom error string
 - A return value of - nilor- {:error, reason}will abort the transaction.- This function is only used for updates or deletes. For inserts, the - __struct__/0function defined by- Ecto.Schemais used to create an empty schema struct.- Examples- # load from a known Repo load: fn %{"id" => id} -> MyApp.Repo.get(Todos.Todo, id) # load from the repo passed to `Elixir.Phoenix.Sync.Writer.transaction/2` load: fn repo, %{"id" => id} -> repo.get(Todos.Todo, id)- If not provided defaults to - Ecto.Repo.get_by/3using the table's schema module and its primary keys.
- :validate- a 2- or 3-arity function that returns an- Ecto.Changesetfor a given mutation.- Callback params- dataan Ecto.Schema struct matching the one used when calling- allow/2returned from the- loadfunction.
- changesa map of changes to apply to the- data.
- operation(for 3-arity callbacks only) the operation action, one of- :insert,- :updateor- :delete
 - At absolute minimum, this should call - Ecto.Changeset.cast/3to validate the proposed data:- def my_changeset(data, changes, _operation) do Ecto.Changeset.cast(data, changes, @permitted_columns) end- Defaults to the given model's - changeset/2function if defined, raises if no changeset function can be found.
- :pre_apply(- pre_post_func/0) - an optional callback that allows for the pre-pending of operations to the- Ecto.Multirepresenting a mutation transaction.- If should be a 3-arity function. - Arguments- multi- an empty- %Ecto.Multi{}instance that you should apply your actions to
- changeset- the changeset representing the individual mutation operation
- context- the current change context
 - The result should be the - Ecto.Multiinstance which will be merged with the one representing the mutation operation.- Because every action in an - Ecto.Multimust have a unique key, we advise using the- operation_name/2function to generate a unique operation name based on the- context.- def pre_apply(multi, changeset, context) do name = Phoenix.Sync.Writer.operation_name(context, :event_insert) Ecto.Multi.insert(multi, name, %Event{todo_id: id}) end- Defaults to no - nil.
- :post_apply(- pre_post_func/0) - an optional callback function that allows for the appending of operations to the- Ecto.Multirepresenting a mutation transaction.- See the docs for - :pre_applyfor the function signature and arguments.- Defaults to no - nil.
- :insert(- operation_opts/0) - Callbacks for validating and modifying- insertoperations.- Accepts definitions for the - validate,- pre_applyand- post_applyfunctions for- insertoperations that will override the top-level equivalents.- See the documentation for - allow/3.- The only difference with these callback functions is that the - actionparameter is redundant and therefore not passed.- Defaults to - [], using the top-level functions for all operations.- :validate(function of arity 2) - A 2-arity function that returns a changeset for the given mutation data.- Arguments: - schemathe original- Ecto.Schemamodel returned from the- loadfunction
- changesa map of changes from the mutation operation
 - Return value: 
- :pre_apply(- pre_post_func/0) - An optional callback that allows for the pre-pending of operations to the- Ecto.Multi.- Arguments and return value as per the global - pre_applycallback.
- :post_apply(- pre_post_func/0) - An optional callback that allows for the appending of operations to the- Ecto.Multi.- Arguments and return value as per the global - post_applycallback.
 
- :update(- operation_opts/0) - Callbacks for validating and modifying- updateoperations. See the documentation for- insert.- :validate(function of arity 2) - A 2-arity function that returns a changeset for the given mutation data.- Arguments: - schemathe original- Ecto.Schemamodel returned from the- loadfunction
- changesa map of changes from the mutation operation
 - Return value: 
- :pre_apply(- pre_post_func/0) - An optional callback that allows for the pre-pending of operations to the- Ecto.Multi.- Arguments and return value as per the global - pre_applycallback.
- :post_apply(- pre_post_func/0) - An optional callback that allows for the appending of operations to the- Ecto.Multi.- Arguments and return value as per the global - post_applycallback.
 
- :delete(- operation_opts/0) - Callbacks for validating and modifying- deleteoperations. See the documentation for- insert.- :validate
- :pre_apply(- pre_post_func/0) - An optional callback that allows for the pre-pending of operations to the- Ecto.Multi.- Arguments and return value as per the global - pre_applycallback.
- :post_apply(- pre_post_func/0) - An optional callback that allows for the appending of operations to the- Ecto.Multi.- Arguments and return value as per the global - post_applycallback.
 
@spec apply( t(), Phoenix.Sync.Writer.Format.transaction_data(), Ecto.Repo.t(), transact_opts() ) :: {:ok, txid(), Ecto.Multi.changes()} | Ecto.Multi.failure()
Ingest and write changes to the given repo in a single call.
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.apply(changes, Repo, parser: &MyFormat.parse/1)is equivalent to:
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.ingest(changes, parser: &MyFormat.parse/1)
|> Phoenix.Sync.Writer.transaction(Repo)@spec ingest(t(), Phoenix.Sync.Writer.Format.transaction_data(), parse_opts()) :: t()
Add the given changes to the operations that will be applied within a transaction/3.
Examples:
{:ok, txid} =
  Phoenix.Sync.Writer.new()
  |> Phoenix.Sync.Writer.allow(MyApp.Todo)
  |> Phoenix.Sync.Writer.ingest(changes, format: MyApp.MutationFormat)
  |> Phoenix.Sync.Writer.ingest(other_changes, parser: &MyApp.MutationFormat.parse_other/1)
  |> Phoenix.Sync.Writer.ingest(more_changes, parser: {MyApp.MutationFormat, :parse_more, []})
  |> Phoenix.Sync.Writer.transaction(MyApp.Repo)Supported options:
- :format(- Format.t()) - A module implementing the- Phoenix.Sync.Writer.Formatbehaviour.
- :parser(- Phoenix.Sync.Writer.Format.parser_fun() | mfa()) - A function that parses some input data and returns a- %Transaction{}struct or an error. See- Phoenix.Sync.Writer.Format.parse_transaction/1.
@spec new() :: t()
Create a new empty writer.
Empty writers will reject writes to any tables. You should configure writes
to the permitted tables by calling allow/3.
Return a unique operation name for use in pre_apply or post_apply callbacks.
Ecto.Multi requires that all operation names be unique within a
transaction. This function gives you a simple way to generate a name for your
own operations that is guaranteed not to conflict with any other.
Example:
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(
  MyModel,
  pre_apply: fn multi, changeset, context ->
    name = Phoenix.Sync.Writer.operation_name(context)
    Ecto.Multi.insert(multi, name, AuditEvent.for_changeset(changeset))
  end
)Like operation_name/1 but allows for a custom label.
@spec parse_transaction(Phoenix.Sync.Writer.Format.transaction_data(), parse_opts()) :: {:ok, Phoenix.Sync.Writer.Transaction.t()} | {:error, term()}
Use the parser configured in the given Writer
instance to decode the given transaction data.
This can be used to handle mutation operations explicitly:
{:ok, txn} = Phoenix.Sync.Writer.parse_transaction(my_json_tx_data, format: Phoenix.Sync.Writer.Format.TanstackDB)
{:ok, txid} =
  Repo.transaction(fn ->
    Enum.each(txn.operations, fn operation ->
      # do something with the given operation
      # raise if something is wrong...
    end)
    # return the transaction id
    Phoenix.Sync.Writer.txid!(Repo)
  end)@spec to_multi(t()) :: Ecto.Multi.t()
Given a writer configuration created using allow/3 translate the list of
mutations into an Ecto.Multi operation.
Example:
%Ecto.Multi{} = multi =
  Phoenix.Sync.Writer.new()
  |> Phoenix.Sync.Writer.allow(MyApp.Todos.Todo, check: &my_check_function/1)
  |> Phoenix.Sync.Writer.allow(MyApp.Options.Option, check: &my_check_function/1)
  |> Phoenix.Sync.Writer.ingest(changes, format: Phoenix.Sync.Writer.Format.TanstackDB)
  |> Phoenix.Sync.Writer.to_multi()If you want to add extra operations to the mutation transaction, beyond those
applied by any pre_apply or post_apply callbacks in your mutation config then use
the functions in Ecto.Multi to do those as normal.
Use transaction/3 to apply the changes to the database and return the
transaction id.
to_multi/1 builds an Ecto.Multi struct containing the operations required to
write the mutation operations to the database.
The order of operation is:
1. Parse
The transaction data is parsed, using either the format or the parser function
supplied in ingest/3.
2. Check
The user input data in each operation in the transaction is tested for validity
via the check function.
At this point no database operations have taken place. Errors at the parse or
check stage result in an early exit. The purpose of the check callback is
sanity check the incoming mutation data against basic sanitization rules, much
as you would do with Plug middleware and controller params pattern matching.
Now that we have a list of validated mutation operations, the next step is:
3. Before-all
Perform any actions defined in the before_all callback.
This only happens once per transaction, the first time the model owning the callback is included in the operation list.
The following actions happen once per operation in the transaction:
4. Load
The load function is called to retrieve the source row from the database
(for update and delete operations), or the schema's __struct__/0
function is called to instantiate an empty struct (insert).
5. Validate
The validate function is called with the result of the load function
and the operation's changes.
6. Pre-apply
The pre_apply callback is called with a multi instance, the result of the
validate function and the current Context. The result is
merged into the transaction's ongoing Ecto.Multi.
7. Apply
The actual operation is applied to the database using one of
Ecto.Multi.insert/4, Ecto.Multi.update/4 or Ecto.Multi.delete/4, and
8. Post-apply
Finally the post_apply callback is called.
Any error in any of these stages will abort the entire transaction and leave your database untouched.
@spec to_multi(t(), Phoenix.Sync.Writer.Format.transaction_data(), parse_opts()) :: Ecto.Multi.t()
Ingest changes and map them into an Ecto.Multi instance ready to apply
using Phoenix.Sync.Writer.transaction/3 or Ecto.Repo.transaction/2.
This is a wrapper around ingest/3 and to_multi/1.
Example:
%Ecto.Multi{} = multi =
  Phoenix.Sync.Writer.new()
  |> Phoenix.Sync.Writer.allow(MyApp.Todos.Todo, check: &my_check_function/1)
  |> Phoenix.Sync.Writer.allow(MyApp.Options.Option, check: &my_check_function/1)
  |> Phoenix.Sync.Writer.to_multi(changes, format: Phoenix.Sync.Writer.Format.TanstackDB)@spec transact( Phoenix.Sync.Writer.Format.transaction_data(), Ecto.Repo.t(), operation_fun :: (Phoenix.Sync.Writer.Operation.t() -> :ok | {:ok, any()} | {:error, any()}), transact_opts() ) :: {:ok, txid()} | {:error, any()}
Apply operations from a mutation directly via a transaction.
operation_fun is a 1-arity function that receives each of the
%Phoenix.Sync.Writer.Operation{} structs within the mutation data and
should apply them appropriately. It should return :ok or {:ok, result} if
successful or {:error, reason} if the operation is invalid or failed to
apply. If any operation returns {:error, _} or raises then the entire
transaction is aborted.
This function will return {:error, reason} if the transaction data fails to parse.
{:ok, txid} =
  Phoenix.Sync.Writer.transact(
    my_encoded_txn,
    MyApp.Repo,
    fn
      %{operation: :insert, relation: [_, "todos"], change: change} ->
        MyApp.Repo.insert(...)
      %{operation: :update, relation: [_, "todos"], data: data, change: change} ->
        MyApp.Repo.update(Ecto.Changeset.cast(...))
      %{operation: :delete, relation: [_, "todos"], data: data} ->
        # we don't allow deletes...
        {:error, "invalid delete"}
    end,
    format: Phoenix.Sync.Writer.Format.TanstackDB,
    timeout: 60_000
  )Any of the opts not used by this module are passed onto the
Ecto.Repo.transaction/2 call.
This is equivalent to the below:
{:ok, txn} =
  Phoenix.Sync.Writer.parse_transaction(
    my_encoded_txn,
    format: Phoenix.Sync.Writer.Format.TanstackDB
  )
{:ok, txid} =
  MyApp.Repo.transaction(fn ->
    Enum.each(txn.operations, fn
      %{operation: :insert, relation: [_, "todos"], change: change} ->
        # insert a Todo
      %{operation: :update, relation: [_, "todos"], data: data, change: change} ->
        # update a Todo
      %{operation: :delete, relation: [_, "todos"], data: data} ->
        # we don't allow deletes...
        raise "invalid delete"
    end)
    Phoenix.Sync.Writer.txid!(MyApp.Repo)
  end, timeout: 60_000)@spec transaction(t() | Ecto.Multi.t(), Ecto.Repo.t(), keyword()) :: {:ok, txid(), Ecto.Multi.changes()} | Ecto.Multi.failure()
Runs the mutation inside a transaction.
Since the mutation operation is expressed as an Ecto.Multi operation, see
the Ecto.Repo docs
for the result if any of your mutations returns an error.
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(MyApp.Todos.Todo)
|> Phoenix.Sync.Writer.allow(MyApp.Options.Option)
|> Phoenix.Sync.Writer.ingest(
  changes,
  format: Phoenix.Sync.Writer.Format.TanstackDB
)
|> Phoenix.Sync.Writer.transaction(MyApp.Repo)
|> case do
  {:ok, txid, _changes} ->
    # return the txid to the client
    Plug.Conn.send_resp(conn, 200, Jason.encode!(%{txid: txid}))
  {:error, _failed_operation, failed_value, _changes_so_far} ->
    # extract the error message from the changeset returned as `failed_value`
    error =
      Ecto.Changeset.traverse_errors(failed_value, fn {msg, opts} ->
        Regex.replace(~r"%{(w+)}", msg, fn _, key ->
          opts |> Keyword.get(String.to_existing_atom(key), key) |> to_string()
        end)
      end)
    Plug.Conn.send_resp(conn, 400, Jason.encode!(error))
  endAlso supports normal fun/0 or fun/1 style transactions much like
Ecto.Repo.transaction/2, returning the txid of the operation:
{:ok, txid, todo} =
  Phoenix.Sync.Writer.transaction(fn ->
    Repo.insert!(changeset)
  end, Repo)@spec txid(Ecto.Multi.changes()) :: {:ok, txid()} | :error
Extract the transaction id from changes or from a Ecto.Repo within a
transaction.
This allows you to use a standard Ecto.Repo.transaction/2 call to apply
mutations defined using apply/2 and extract the transaction id afterwards.
Example
{:ok, changes} =
  Phoenix.Sync.Writer.new()
  |> Phoenix.Sync.Writer.allow(MyApp.Todos.Todo)
  |> Phoenix.Sync.Writer.allow(MyApp.Options.Option)
  |> Phoenix.Sync.Writer.to_multi(changes, format: Phoenix.Sync.Writer.Format.TanstackDB)
  |> MyApp.Repo.transaction()
{:ok, txid} = Phoenix.Sync.Writer.txid(changes)It also allows you to get a transaction id from any active transaction:
MyApp.Repo.transaction(fn ->
  {:ok, txid} = Phoenix.Sync.Writer.txid(MyApp.Repo)
end)Attempting to run txid/1 on a repo outside a transaction will return an
error.
@spec txid!(Ecto.Multi.changes()) :: txid()
Returns the a transaction id or raises on an error.
See txid/1.