Phoenix.Sync.Writer (Phoenix.Sync v0.4.4)

View Source

Provides write-path sync support for Phoenix- or Plug-based apps.

Imagine you're building an application on sync. You've used the read-path sync utilities to sync data into the front-end. If the client then changes the data locally, these writes can be batched up and sent back to the server.

Phoenix.Sync.Writer provides a principled way of ingesting these local writes and applying them to Postgres. In a way that works-with and re-uses your existing authorization logic and your existing Ecto.Schemas and Ecto.Changeset validation functions.

This allows you to build instant, offline-capable applications that work with local optimistic state.

Controller example

For example, take a project management app that's using @TanStack/db to batch up local optimistic writes and POST them to the Phoenix.Controller below:

defmodule MutationController do
  use Phoenix.Controller, formats: [:json]

  alias Phoenix.Sync.Writer
  alias Phoenix.Sync.Writer.Format

  def mutate(conn, %{"transaction" => transaction} = _params) do
    user_id = conn.assigns.user_id

    {:ok, txid, _changes} =
      Writer.new()
      |> Writer.allow(
        Projects.Project,
        check: reject_invalid_params/1,
        load: &Projects.load_for_user(&1, user_id),
        validate: &Projects.Project.changeset/2
      )
      |> Writer.allow(
        Projects.Issue,
        # Use the sensible defaults:
        # validate: Projects.Issue.changeset/2
        # etc.
      )
      |> Writer.apply(
        transaction,
        Repo,
        format: Format.TanstackDB
      )

    json(conn, %{txid: txid})
  end
end

The controller constructs a Phoenix.Sync.Writer instance and pipes it through a series of allow/3 calls, registering functions against Ecto.Schemas (in this case Projects.Project and Projects.Issue) to validate and authorize each of these mutation operations before applying them as a single transaction.

This controller can become a single, unified entry point for ingesting writes into your application. You can extend the pipeline with allow/3 calls for every schema that you'd like to be able to ingest changes to.

The check, load, validate, etc. callbacks to the allow function are designed to allow you to re-use your authorization and validation logic from your existing Plug middleware and Ecto.Changeset functions.

Warning

The mutation operations received from clients MUST be considered as untrusted.

Though the HTTP operation that uploaded them will have been authenticated and authorized by your existing Plug middleware as usual, the actual content of the request that is turned into writes against your database needs to be validated very carefully against the privileges of the current user.

That's what Phoenix.Sync.Writer is for: specifying which resources can be updated and registering functions to authorize and validate the mutation payload.

Usage levels (high, mid, low)

You don't need to use Phoenix.Sync.Writer to ingest write operations using Phoenix. Phoenix already ships with primitives like Ecto.Multi and Ecto.Repo.transaction/2. However, Phoenix.Sync.Writer provides:

  • a number of convienience functions that simplify ingesting mutation operations
  • a high-level pipeline that dries up a lot of common boilerplate and allows you to re-use your existing Plug and Ecto.Changeset logic

High-level usage

The controller example above uses a higher level pipeline that dries up common boilerplate, whilst still allowing flexibility and extensibility. You create an ingest pipeline by instantiating a Phoenix.Sync.Writer instance and piping into allow/3 and apply/4 calls:

{:ok, txid, _changes} =
  Phoenix.Sync.Writer.new()
  |> Phoenix.Sync.Writer.allow(MyApp.Todo)
  |> Phoenix.Sync.Writer.allow(MyApp.OtherSchema)
  |> Phoenix.Sync.Writer.apply(transaction, Repo, format: MyApp.MutationFormat)

Or, instead of apply/4 you can use seperate calls to ingest/3 and then transaction/2. This allows you to ingest multiple formats, for example:

{:ok, txid} =
  Phoenix.Sync.Writer.new()
  |> Phoenix.Sync.Writer.allow(MyApp.Todo)
  |> Phoenix.Sync.Writer.ingest(changes, format: MyApp.MutationFormat)
  |> Phoenix.Sync.Writer.ingest(other_changes, parser: &MyApp.MutationFormat.parse_other/1)
  |> Phoenix.Sync.Writer.ingest(more_changes, parser: {MyApp.MutationFormat, :parse_more, []})
  |> Phoenix.Sync.Writer.transaction(MyApp.Repo)

And at any point you can drop down / eject out to the underlying Ecto.Multi using to_multi/1 or to_multi/3:

multi =
  Phoenix.Sync.Writer.new()
  |> Phoenix.Sync.Writer.allow(MyApp.Todo)
  |> Phoenix.Sync.Writer.to_multi(changes, format: MyApp.MutationFormat)

# ... do anything you like with the multi ...

{:ok, changes} = Repo.transaction(multi)
{:ok, txid} = Phoenix.Sync.Writer.txid(changes)

Mid-level usage

The pattern above uses a lower-level transact/4 function. This abstracts the mechanical details of transaction management whilst still allowing you to handle and apply mutation operations yourself:

{:ok, txid} =
  Phoenix.Sync.Writer.transact(
    my_encoded_txn,
    MyApp.Repo,
    fn
      %{operation: :insert, relation: [_, "todos"], change: change} ->
        MyApp.Repo.insert(...)
      %{operation: :update, relation: [_, "todos"], data: data, change: change} ->
        MyApp.Repo.update(Ecto.Changeset.cast(...))
      %{operation: :delete, relation: [_, "todos"], data: data} ->
        # we don't allow deletes...
        {:error, "invalid delete"}
    end,
    format: Phoenix.Sync.Writer.Format.TanstackDB,
    timeout: 60_000
  )

However, with larger applications, this flexibility can become tiresome as you end up repeating boilerplate and defining your own pipeline to authorize, validate and apply changes with the right error handling and return values.

Low-level usage (DIY)

For the more advanced cases, if you're comfortable parsing, validating and persisting changes yourself then the simplest way to use Phoenix.Sync.Writer is to use txid!/1 within Ecto.Repo.transaction/2:

{:ok, txid} =
  MyApp.Repo.transaction(fn ->
    # ... save your changes to the database ...

    # Return the transaction id.
    Phoenix.Sync.Writer.txid!(MyApp.Repo)
  end)

This returns the database transaction ID that the changes were applied within. This allows you to return it to the client, which can then monitor the read-path sync stream to detect when the transaction syncs through. At which point the client can discard its local optimistic state.

A convinient way of doing this is to parse the request data into a list of Phoenix.Sync.Writer.Operations using a Phoenix.Sync.Writer.Format. You can then apply the changes yourself by matching on the operation data:

{:ok, %Transaction{operations: operations}} =
  Phoenix.Sync.Writer.parse_transaction(
    my_encoded_txn,
    format: Phoenix.Sync.Writer.Format.TanstackDB
  )

{:ok, txid} =
  MyApp.Repo.transaction(fn ->
    Enum.each(txn.operations, fn
      %{operation: :insert, relation: [_, "todos"], change: change} ->
        # insert a Todo
      %{operation: :update, relation: [_, "todos"], data: data, change: change} ->
        # update a Todo
      %{operation: :delete, relation: [_, "todos"], data: data} ->
        # for example, if you don't want to allow deletes...
        raise "invalid delete"
    end)

    Phoenix.Sync.Writer.txid!(MyApp.Repo)
  end, timeout: 60_000)

Transactions

The txid in the return value from apply/4 and txid/1 / txid!/1 allows the Postgres transaction ID to be returned to the client in the response data.

This allows clients to monitor the read-path sync stream and match on the arrival of the same transaction id. When the client receives this transaction id back through its sync stream, it knows that it can discard the local optimistic state for that transaction. (This is a more robust way of managing optimistic state than just matching on instance IDs, as it allows for local changes to be rebased on concurrent changes to the same date from other users).

Phoenix.Sync.Writer uses Ecto.Multi's transaction update mechanism under the hood, which means that either all the operations in a client transaction are accepted or none are. See to_multi/1 for how you can hook into the Ecto.Multi after applying your change data.

Compatibility

Phoenix.Sync.Writer can only return transaction ids when connecting to a Postgres database (a repo with adapter: Ecto.Adapters.Postgres). You can use this module for other databases, but the returned txid will be nil.

Client Libraries

Phoenix.Sync.Writer is not coupled to any particular client-side implementation. See Electric's write pattern guides and example code for implementation strategies and examples.

Instead, Phoenix.Sync.Writer provides an adapter pattern where you can register a format adapter or parser function to parse the expected payload format from a client side library into the struct that Phoenix.Sync.Writer expects.

The currently supported format adapters are:

  • TanStack/db "A reactive client store for building super fast apps on sync"

    Integration:

    Phoenix.Sync.Writer.new()
    |> Phoenix.Sync.Writer.ingest(mutation_data, format: Phoenix.Sync.Writer.Format.TanstackDB)
    |> Phoenix.Sync.Writer.transaction(Repo)

Usage

Much as every controller action must be authenticated, authorized and validated to prevent users writing invalid data or data that they do not have permission to modify, mutations MUST be validated for both correctness (are the given values valid?) and permissions (is the current user allowed to apply the given mutation?).

This dual verification -- of data and permissions -- is performed by a pipeline of application-defined callbacks for every model that you allow writes to:

  • check - a function that performs a "pre-flight" sanity check of the user-provided data in the mutation; this should just validate the data and not usually hit the database; checks are performed on all operations in a transaction before proceeding to the next steps in the pipeline; this allows for fast rejection of invalid data before performing more expensive operations
  • load - a function that takes the original data and returns the existing model from the database, if it exists, for an update or delete operation
  • validate - create and validate an Ecto.Changeset from the source data and mutation changes; this is intended to be compatible with using existing schema changeset functions; note that, as per any changeset function, the validate function can perform both authorization and validation
  • pre_apply and post_apply - add arbitrary Ecto.Multi operations to the transaction based on the current operation

See apply/4 and the Callbacks for how the transaction is processed internally and how best to use these callback functions to express your app's authorization and validation requirements.

Calling new/0 creates an empty writer configuration with the given mutation parser. But this alone does not permit any mutations. In order to allow writes from clients you must call allow/3 with a schema module and some callback functions.

# create an empty writer configuration
writer = Phoenix.Sync.Writer.new()

# allow writes to the `Todos.Todo` table
# using `Todos.check_mutation/1` to validate mutation data before
# touching the database
writer = Phoenix.Sync.Writer.allow(writer, Todos.Todo, check: &Todos.check_mutation/1)

If the table name on the client differs from the Postgres table, then you can add a table option that specifies the client table name that this allow/3 call applies to:

# `client_todos` is the name of the `todos` table on the clients
writer =
  Phoenix.Sync.Writer.allow(
    writer,
    Todos.Todo,
    validate: &Todos.validate_mutation/2,
    table: "client_todos"
  )

Callbacks

Check

The check option should be a 1-arity function whose purpose is to test the mutation data against the authorization rules for the application and model before attempting any database access.

If the changes are valid then it should return :ok or {:error, reason} if they're invalid.

If any of the changes fail the auth test, then the entire transaction will be rejected.

This is the first line of defence against malicious writes as it provides a quick check of the data from the clients before any reads or writes to the database.

Note that the writer pipeline checks all the operations before proceeding to load, validate and apply each operation in turn.

def check(%Phoenix.Sync.Writer.Operation{} = operation) do
  # :ok or {:error, "..."}
end

Load

The load callback takes the data in update or delete mutations (i.e.: the original data before changes), and uses it to retrieve the original Ecto.Struct model from the database.

It can be a 1- or 2-arity function. The 1-arity version receives just the data parameters. The 2-arity version receives the Ecto.Repo that the transaction is being applied to and the data parameters.

# 1-arity version
def load(%{"column" => "value"} = data) do
  # Repo.get(...)
end

# 2-arity version
def load(repo, %{"column" => "value"} = data) do
  # repo.get(...)
end

If not provided defaults to using Ecto.Repo.get_by/3 using the primary key(s) defined on the model.

For insert operations this load function is not used. Instead, the original struct is created by calling the __struct__/0 function on the Ecto.Schema module.

Validate

The validate callback performs the usual role of a changeset function: to validate the changes against the model's data constraints using the functions in Ecto.Changeset.

It should return an Ecto.Changeset instance (or possibly the original schema struct in the case of deletes). If any of the transaction's changeset's are marked as invalid, then the entire transaction is aborted.

If not specified, the validate function is defaulted to the schema model's standard changeset/2 function if available.

The callback can be either a 2- or 3-arity function.

The 2-arity version will receive the Ecto.Schema struct returned from the load function and the mutation changes. The 3-arity version will receive the loaded struct, the changes and the operation.

# 2-arity version
def changeset(%Todo{} = data, %{} = changes) do
  data
  |> Ecto.Changeset.cast(changes, [:title, :completed])
  |> Ecto.Changeset.validate_required(changes, [:title])
end

# 3-arity version
def changeset(%Todo{} = data, %{} = changes, operation)
    when operation in [:insert, :update, :delete] do
  # ...
end

Primary keys

Whether the params for insert operations contains a value for the new primary key is application specific. It's certainly not required if you have declared your Ecto.Schema model with a primary key set to autogenerate: true.

It's worth noting that if you are accepting primary key values as part of your insert changes, then you should use UUID primary keys for your models to prevent conflicts.

pre_apply and post_apply

These callbacks, run before or after the actual insert, update or delete operation allow for the addition of side effects to the transaction.

They are passed an empty Ecto.Multi struct and which is then merged into the writer's transaction.

They also allow for more validation/authorization steps as any operation within the callback that returns an "invalid" operation will abort the entire transaction.

def pre_or_post_apply(%Ecto.Multi{} = multi, %Ecto.Changeset{} = change, %Phoenix.Sync.Writer.Context{} = context) do
  multi
  # add some side-effects
  # |> Ecto.Multi.run(Phoenix.Sync.Writer.operation_name(context, :image), fn _changes ->
  #   with :ok <- File.write(image.name, image.contents) do
  #    {:ok, nil}
  #   end
  # end)
  #
  # validate the current transaction and abort using an {:error, value} tuple
  # |> Ecto.Multi.run(Phoenix.Sync.Writer.operation_name(context, :my_validation), fn _changes ->
  #   {:error, "reject entire transaction"}
  # end)
end

Note the use of operation_name/2 when adding operations. Every name in the final Ecto.Multi struct must be unique, operation_name/2 generates names that are guaranteed to be unique to the current operation and callback.

Per-operation callbacks

If you want to re-use an existing function on a per-operation basis, then in your write configuration you can define both top-level and per operation callbacks:

Phoenix.Sync.Writer.allow(
  Todos.Todo,
  load: &Todos.fetch_for_user(&1, user_id),
  check: &Todos.check_mutation(&1, user_id),
  validate: &Todos.Todo.changeset/2,
  update: [
    # for inserts and deletes, use &Todos.Todo.changeset/2 but for updates
    # use this function
    validate: &Todos.Todo.update_changeset/2,
    pre_apply: &Todos.pre_apply_update_todo/3
  ],
  insert: [
    # optional validate, pre_apply and post_apply
    # overrides for insert operations
  ],
  delete: [
    # optional validate, pre_apply and post_apply
    # overrides for delete operations
  ],
)

End-to-end usage

The combination of the check, load, validate, pre_apply and post_apply functions can be composed to provide strong guarantees of validity.

The aim is to provide an interface as similar to that used in controller functions as possible.

Here we show an example controller module that allows updating of Todos via a standard HTTP PUT update handler and also via HTTP POSTs to the mutation handler which applies optimistic writes via this module.

We use the load function to validate the ownership of the original Todo by looking up the data using both the id and the user_id. This makes it impossible for user a to update todos belonging to user b.

defmodule MyController do
  use Phoenix.Controller, formats: [:html, :json]

  alias Phoenix.Sync.Writer

  # The standard HTTP PUT update handler
  def update(conn, %{"todo" => todo_params}) do
    user_id = conn.assigns.user_id

    with {:ok, todo} <- fetch_for_user(params, user_id),
         {:ok, params} <- validate_params(todo, todo_params, user_id),
         {:ok, updated_todo} <- Todos.update(todo, params) do
      redirect(conn, to: ~p"/todos/#{updated_todo.id}")
    end
  end

  # The HTTP POST mutations handler which receives JSON data
  def mutations(conn, %{"transaction" => transaction} = _params) do
    user_id = conn.assigns.user_id

    {:ok, txid, _changes} =
      Writer.new()
      |> Writer.allow(
        Todos.Todo,
        check: &validate_mutation(&1, user_id),
        load: &fetch_for_user(&1, user_id),
      )
      |> Writer.apply(transaction, Repo, format: Writer.Format.TanstackDB)

    json(conn, %{txid: txid})
  end

  # Included here for completeness but in a real app would be a
  # public function in the Todos context.
  # Because we're validating the ownership of the Todo here we add an
  # extra layer of auth checks, preventing one user from modifying
  # the Todos of another.
  defp fetch_for_user(%{"id" => id}, user_id) do
    from(t in Todos.Todo, where: t.id == ^id  and t.user_id == ^user_id)
    |> Repo.one()
  end

  defp validate_mutation(%Writer.Operation{} = op, user_id) do
    with :ok <- validate_params(op.data, user_id) do
      validate_params(op.changes, user_id)
    end
  end

  defp validate_params(%{"user_id" => user_id}, user_id), do: :ok
  defp validate_params(%{} = _params, _user_id), do: {:error, "invalid user_id"}
end

Because Phoenix.Sync.Write leverages Ecto.Multi to do the work of applying changes and managing errors, you're also free to extend the actions that are performed with every transaction using pre_apply and post_apply callbacks configured per-table or per-table per-action (insert, update, delete). See allow/3 for more information on the configuration options for each table.

The result of to_multi/1 or to_multi/3 is an Ecto.Multi instance so you can also just append operations using the normal Ecto.Multi functions:

{:ok, txid, _changes} =
  Writer.new()
  |> Writer.allow(Todo, ...)
  |> Writer.to_multi(transaction, parser: &my_transaction_parser/1)
  |> Ecto.Multi.insert(:my_action, %Event{})
  |> Writer.transaction(Repo)

Summary

Functions

Allow writes to the given Ecto.Schema.

Ingest and write changes to the given repo in a single call.

Add the given changes to the operations that will be applied within a transaction/3.

Create a new empty writer.

Return a unique operation name for use in pre_apply or post_apply callbacks.

Like operation_name/1 but allows for a custom label.

Use the parser configured in the given Writer instance to decode the given transaction data.

Given a writer configuration created using allow/3 translate the list of mutations into an Ecto.Multi operation.

Ingest changes and map them into an Ecto.Multi instance ready to apply using Phoenix.Sync.Writer.transaction/3 or Ecto.Repo.transaction/2.

Apply operations from a mutation directly via a transaction.

Runs the mutation inside a transaction.

Extract the transaction id from changes or from a Ecto.Repo within a transaction.

Returns the a transaction id or raises on an error.

Types

allow_opts()

@type allow_opts() :: [
  table: String.t() | [String.t(), ...],
  accept: [operation(), ...],
  check: (Phoenix.Sync.Writer.Operation.t() -> :ok | {:error, term()}),
  before_all: (Ecto.Multi.t() -> Ecto.Multi.t()),
  load:
    (Ecto.Repo.t(), data() ->
       Ecto.Schema.t() | {:ok, Ecto.Schema.t()} | nil | {:error, String.t()})
    | (data() ->
         Ecto.Schema.t() | {:ok, Ecto.Schema.t()} | nil | {:error, String.t()}),
  validate:
    (Ecto.Schema.t(), data() -> Ecto.Changeset.t())
    | (Ecto.Schema.t(), data(), operation() -> Ecto.Changeset.t()),
  pre_apply: pre_post_func(),
  post_apply: pre_post_func(),
  insert: operation_opts(),
  update: operation_opts(),
  delete: operation_opts()
]

context()

@type context() :: %Phoenix.Sync.Writer.Context{
  action: term(),
  callback: :load | :validate | :pre_apply | :post_apply,
  changes: Ecto.Mult.changes(),
  index: non_neg_integer(),
  operation: :insert | :update | :delete,
  pk: term(),
  schema: Ecto.Schema.t(),
  writer: term()
}

data()

@type data() :: %{required(binary()) => any()}

ingest_change()

mutation()

@type mutation() :: %{required(binary()) => any()}

operation()

@type operation() :: :insert | :update | :delete

operation_opts()

@type operation_opts() :: [
  validate: (Ecto.Schema.t(), data() -> Ecto.Changeset.t()),
  pre_apply: pre_post_func(),
  post_apply: pre_post_func()
]

parse_opts()

@type parse_opts() :: [
  format: Phoenix.Sync.Writer.Format.t(),
  parser: Phoenix.Sync.Writer.Format.parser_fun()
]

pre_post_func()

@type pre_post_func() :: (Ecto.Multi.t(), Ecto.Changeset.t(), context() ->
                      Ecto.Multi.t())

repo_transaction_opts()

@type repo_transaction_opts() :: keyword()

schema_config()

@type schema_config() :: %{required(atom()) => term()}

t()

@type t() :: %Phoenix.Sync.Writer{
  ingest: [ingest_change()],
  mappings: %{required(binary() | [binary(), ...]) => schema_config()}
}

transact_opts()

@type transact_opts() :: [parse_opts() | repo_transaction_opts()]

txid()

Functions

allow(writer, schema, opts \\ [])

@spec allow(t(), module(), allow_opts()) :: t()

Allow writes to the given Ecto.Schema.

Only tables specified in calls to allow/3 will be accepted by later calls to transaction/3. Any changes to tables not explicitly defined by allow/3 calls will be rejected and cause the entire transaction to be rejected.

Examples

# allow writes to the Todo table using
# `MyApp.Todos.Todo.check_mutation/1` to validate operations
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(
  MyApp.Todos.Todo,
  check: &MyApp.Todos.check_mutation/1
)

# A more complex configuration adding an `post_apply` callback to inserts
# and using a custom query to load the original database value.
Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(
  MyApp.Todos..Todo,
  load: &MyApp.Todos.get_for_mutation/1,
  check: &MyApp.Todos.check_mutation/1,
  insert: [
    post_apply: &MyApp.Todos.post_apply_insert_mutation/3
  ]
)

Supported options

  • :table (String.t() | [String.t(), ...]) - Override the table name of the Ecto.Schema struct to allow for mapping between table names on the client and within Postgres.

    If you pass just a table name, then any schema prefix in the client tables is ignored, so

    Writer.allow(Todos, table: "todos")

    will match client operations for ["public", "todos"] and ["application", "todos"] etc.

    If you provide a 2-element list then the mapping will be exact and only client relations matching the full [schema, table] pair will match the given schema.

    Writer.allow(Todos, table: ["public", "todos"])

    Will match client operations for ["public", "todos"] but not ["application", "todos"] etc.

    Defaults to Model.__schema__(:source), or if the Ecto schema module has specified a namespace [Model.__schema__(:prefix), Model.__schema__(:source)].

  • :accept - A list of actions to accept.

    A transaction containing an operation not in the accept list will be rejected.

    Defaults to accepting all operations, [:insert, :update, :delete].

  • :check (function of arity 1) - A function that validates every %Phoenix.Sync.Writer.Operation{} in the transaction for correctness.

    This is run before any database access is performed and so provides an efficient way to prevent malicious writes without hitting your database.

    Defaults to a function that allows all operations: fn _ -> :ok end.

  • :before_all (function of arity 1) - Run only once (per transaction) after the parsing and check callback have completed and before load and validate functions run.

    Useful for pre-loading data from the database that can be shared across all operation callbacks for all the mutations.

    Arguments:

    Return value:

    Defaults to no callback.

  • :load - A 1- or 2-arity function that accepts either the mutation operation's data or an Ecto.Repo instance and the mutation data and returns the original row from the database.

    Arguments:

    Valid return values are:

    • struct() - an Ecto.Schema struct, that must match the module passed to allow/3
    • {:ok, struct()} - as above but wrapped in an :ok tuple
    • nil - if no row matches the search criteria, or
    • {:error, String.t()} - as nil but with a custom error string

    A return value of nil or {:error, reason} will abort the transaction.

    This function is only used for updates or deletes. For inserts, the __struct__/0 function defined by Ecto.Schema is used to create an empty schema struct.

    Examples

    # load from a known Repo
    load: fn %{"id" => id} -> MyApp.Repo.get(Todos.Todo, id)
    
    # load from the repo passed to `Elixir.Phoenix.Sync.Writer.transaction/2`
    load: fn repo, %{"id" => id} -> repo.get(Todos.Todo, id)

    If not provided defaults to Ecto.Repo.get_by/3 using the table's schema module and its primary keys.

  • :validate - a 2- or 3-arity function that returns an Ecto.Changeset for a given mutation.

    Callback params

    • data an Ecto.Schema struct matching the one used when calling allow/2 returned from the load function.
    • changes a map of changes to apply to the data.
    • operation (for 3-arity callbacks only) the operation action, one of :insert, :update or :delete

    At absolute minimum, this should call Ecto.Changeset.cast/3 to validate the proposed data:

    def my_changeset(data, changes, _operation) do
      Ecto.Changeset.cast(data, changes, @permitted_columns)
    end

    Defaults to the given model's changeset/2 function if defined, raises if no changeset function can be found.

  • :pre_apply (pre_post_func/0) - an optional callback that allows for the pre-pending of operations to the Ecto.Multi representing a mutation transaction.

    If should be a 3-arity function.

    Arguments

    • multi - an empty %Ecto.Multi{} instance that you should apply your actions to
    • changeset - the changeset representing the individual mutation operation
    • context - the current change context

    The result should be the Ecto.Multi instance which will be merged with the one representing the mutation operation.

    Because every action in an Ecto.Multi must have a unique key, we advise using the operation_name/2 function to generate a unique operation name based on the context.

    def pre_apply(multi, changeset, context) do
      name = Phoenix.Sync.Writer.operation_name(context, :event_insert)
      Ecto.Multi.insert(multi, name, %Event{todo_id: id})
    end

    Defaults to no nil.

  • :post_apply (pre_post_func/0) - an optional callback function that allows for the appending of operations to the Ecto.Multi representing a mutation transaction.

    See the docs for :pre_apply for the function signature and arguments.

    Defaults to no nil.

  • :insert (operation_opts/0) - Callbacks for validating and modifying insert operations.

    Accepts definitions for the validate, pre_apply and post_apply functions for insert operations that will override the top-level equivalents.

    See the documentation for allow/3.

    The only difference with these callback functions is that the action parameter is redundant and therefore not passed.

    Defaults to [], using the top-level functions for all operations.

    • :validate (function of arity 2) - A 2-arity function that returns a changeset for the given mutation data.

      Arguments:

      • schema the original Ecto.Schema model returned from the load function
      • changes a map of changes from the mutation operation

      Return value:

    • :pre_apply (pre_post_func/0) - An optional callback that allows for the pre-pending of operations to the Ecto.Multi.

      Arguments and return value as per the global pre_apply callback.

    • :post_apply (pre_post_func/0) - An optional callback that allows for the appending of operations to the Ecto.Multi.

      Arguments and return value as per the global post_apply callback.

  • :update (operation_opts/0) - Callbacks for validating and modifying update operations. See the documentation for insert.

    • :validate (function of arity 2) - A 2-arity function that returns a changeset for the given mutation data.

      Arguments:

      • schema the original Ecto.Schema model returned from the load function
      • changes a map of changes from the mutation operation

      Return value:

    • :pre_apply (pre_post_func/0) - An optional callback that allows for the pre-pending of operations to the Ecto.Multi.

      Arguments and return value as per the global pre_apply callback.

    • :post_apply (pre_post_func/0) - An optional callback that allows for the appending of operations to the Ecto.Multi.

      Arguments and return value as per the global post_apply callback.

  • :delete (operation_opts/0) - Callbacks for validating and modifying delete operations. See the documentation for insert.

    • :validate

    • :pre_apply (pre_post_func/0) - An optional callback that allows for the pre-pending of operations to the Ecto.Multi.

      Arguments and return value as per the global pre_apply callback.

    • :post_apply (pre_post_func/0) - An optional callback that allows for the appending of operations to the Ecto.Multi.

      Arguments and return value as per the global post_apply callback.

apply(writer, changes, repo, opts)

Ingest and write changes to the given repo in a single call.

Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.apply(changes, Repo, parser: &MyFormat.parse/1)

is equivalent to:

Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.ingest(changes, parser: &MyFormat.parse/1)
|> Phoenix.Sync.Writer.transaction(Repo)

ingest(writer, changes, opts)

Add the given changes to the operations that will be applied within a transaction/3.

Examples:

{:ok, txid} =
  Phoenix.Sync.Writer.new()
  |> Phoenix.Sync.Writer.allow(MyApp.Todo)
  |> Phoenix.Sync.Writer.ingest(changes, format: MyApp.MutationFormat)
  |> Phoenix.Sync.Writer.ingest(other_changes, parser: &MyApp.MutationFormat.parse_other/1)
  |> Phoenix.Sync.Writer.ingest(more_changes, parser: {MyApp.MutationFormat, :parse_more, []})
  |> Phoenix.Sync.Writer.transaction(MyApp.Repo)

Supported options:

new()

@spec new() :: t()

Create a new empty writer.

Empty writers will reject writes to any tables. You should configure writes to the permitted tables by calling allow/3.

operation_name(ctx)

Return a unique operation name for use in pre_apply or post_apply callbacks.

Ecto.Multi requires that all operation names be unique within a transaction. This function gives you a simple way to generate a name for your own operations that is guarateed not to conflict with any other.

Example:

Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(
  MyModel,
  pre_apply: fn multi, changeset, context ->
    name = Phoenix.Sync.Writer.operation_name(context)
    Ecto.Multi.insert(multi, name, AuditEvent.for_changeset(changeset))
  end
)

operation_name(ctx, label)

@spec operation_name(context(), term()) :: term()

Like operation_name/1 but allows for a custom label.

parse_transaction(changes, opts)

@spec parse_transaction(Phoenix.Sync.Writer.Format.transaction_data(), parse_opts()) ::
  {:ok, Phoenix.Sync.Writer.Transaction.t()} | {:error, term()}

Use the parser configured in the given Writer instance to decode the given transaction data.

This can be used to handle mutation operations explicitly:

{:ok, txn} = Phoenix.Sync.Writer.parse_transaction(my_json_tx_data, format: Phoenix.Sync.Writer.Format.TanstackDB)

{:ok, txid} =
  Repo.transaction(fn ->
    Enum.each(txn.operations, fn operation ->
      # do something wih the given operation
      # raise if something is wrong...
    end)
    # return the transaction id
    Phoenix.Sync.Writer.txid!(Repo)
  end)

to_multi(writer)

@spec to_multi(t()) :: Ecto.Multi.t()

Given a writer configuration created using allow/3 translate the list of mutations into an Ecto.Multi operation.

Example:

%Ecto.Multi{} = multi =
  Phoenix.Sync.Writer.new()
  |> Phoenix.Sync.Writer.allow(MyApp.Todos.Todo, check: &my_check_function/1)
  |> Phoenix.Sync.Writer.allow(MyApp.Options.Option, check: &my_check_function/1)
  |> Phoenix.Sync.Writer.ingest(changes, format: Phoenix.Sync.Writer.Format.TanstackDB)
  |> Phoenix.Sync.Writer.to_multi()

If you want to add extra operations to the mutation transaction, beyond those applied by any pre_apply or post_apply callbacks in your mutation config then use the functions in Ecto.Multi to do those as normal.

Use transaction/3 to apply the changes to the database and return the transaction id.

to_multi/1 builds an Ecto.Multi struct containing the operations required to write the mutation operations to the database.

The order of operation is:

1. Parse

The transaction data is parsed, using either the format or the parser function supplied in ingest/3.

2. Check

The user input data in each operation in the transaction is tested for validity via the check function.

At this point no database operations have taken place. Errors at the parse or check stage result in an early exit. The purpose of the check callback is sanity check the incoming mutation data against basic sanitization rules, much as you would do with Plug middleware and controller params pattern matching.

Now that we have a list of validated mutation operations, the next step is:

3. Before-all

Perform any actions defined in the before_all callback.

This only happens once per transaction, the first time the model owning the callback is included in the operation list.

The following actions happen once per operation in the transaction:

4. Load

The load function is called to retrieve the source row from the database (for update and delete operations), or the schema's __struct__/0 function is called to instantiate an empty struct (insert).

5. Validate

The validate function is called with the result of the load function and the operation's changes.

6. Pre-apply

The pre_apply callback is called with a multi instance, the result of the validate function and the current Context. The result is merged into the transaction's ongoing Ecto.Multi.

7. Apply

The actual operation is applied to the database using one of Ecto.Multi.insert/4, Ecto.Multi.update/4 or Ecto.Multi.delete/4, and

8. Post-apply

Finally the post_apply callback is called.

Any error in any of these stages will abort the entire transaction and leave your database untouched.

to_multi(writer, changes, opts)

Ingest changes and map them into an Ecto.Multi instance ready to apply using Phoenix.Sync.Writer.transaction/3 or Ecto.Repo.transaction/2.

This is a wrapper around ingest/3 and to_multi/1.

Example:

%Ecto.Multi{} = multi =
  Phoenix.Sync.Writer.new()
  |> Phoenix.Sync.Writer.allow(MyApp.Todos.Todo, check: &my_check_function/1)
  |> Phoenix.Sync.Writer.allow(MyApp.Options.Option, check: &my_check_function/1)
  |> Phoenix.Sync.Writer.to_multi(changes, format: Phoenix.Sync.Writer.Format.TanstackDB)

transact(changes, repo, operation_fun, opts)

@spec transact(
  Phoenix.Sync.Writer.Format.transaction_data(),
  Ecto.Repo.t(),
  operation_fun :: (Phoenix.Sync.Writer.Operation.t() ->
                      :ok | {:ok, any()} | {:error, any()}),
  transact_opts()
) :: {:ok, txid()} | {:error, any()}

Apply operations from a mutation directly via a transaction.

operation_fun is a 1-arity function that receives each of the %Phoenix.Sync.Writer.Operation{} structs within the mutation data and should apply them appropriately. It should return :ok or {:ok, result} if successful or {:error, reason} if the operation is invalid or failed to apply. If any operation returns {:error, _} or raises then the entire transaction is aborted.

This function will return {:error, reason} if the transaction data fails to parse.

{:ok, txid} =
  Phoenix.Sync.Writer.transact(
    my_encoded_txn,
    MyApp.Repo,
    fn
      %{operation: :insert, relation: [_, "todos"], change: change} ->
        MyApp.Repo.insert(...)
      %{operation: :update, relation: [_, "todos"], data: data, change: change} ->
        MyApp.Repo.update(Ecto.Changeset.cast(...))
      %{operation: :delete, relation: [_, "todos"], data: data} ->
        # we don't allow deletes...
        {:error, "invalid delete"}
    end,
    format: Phoenix.Sync.Writer.Format.TanstackDB,
    timeout: 60_000
  )

Any of the opts not used by this module are passed onto the Ecto.Repo.transaction/2 call.

This is equivalent to the below:

{:ok, txn} =
  Phoenix.Sync.Writer.parse_transaction(
    my_encoded_txn,
    format: Phoenix.Sync.Writer.Format.TanstackDB
  )

{:ok, txid} =
  MyApp.Repo.transaction(fn ->
    Enum.each(txn.operations, fn
      %{operation: :insert, relation: [_, "todos"], change: change} ->
        # insert a Todo
      %{operation: :update, relation: [_, "todos"], data: data, change: change} ->
        # update a Todo
      %{operation: :delete, relation: [_, "todos"], data: data} ->
        # we don't allow deletes...
        raise "invalid delete"
    end)
    Phoenix.Sync.Writer.txid!(MyApp.Repo)
  end, timeout: 60_000)

transaction(writer_or_multi, repo, opts \\ [])

@spec transaction(t() | Ecto.Multi.t(), Ecto.Repo.t(), keyword()) ::
  {:ok, txid(), Ecto.Multi.changes()} | Ecto.Multi.failure()

Runs the mutation inside a transaction.

Since the mutation operation is expressed as an Ecto.Multi operation, see the Ecto.Repo docs for the result if any of your mutations returns an error.

Phoenix.Sync.Writer.new()
|> Phoenix.Sync.Writer.allow(MyApp.Todos.Todo)
|> Phoenix.Sync.Writer.allow(MyApp.Options.Option)
|> Phoenix.Sync.Writer.ingest(
  changes,
  format: Phoenix.Sync.Writer.Format.TanstackDB
)
|> Phoenix.Sync.Writer.transaction(MyApp.Repo)
|> case do
  {:ok, txid, _changes} ->
    # return the txid to the client
    Plug.Conn.send_resp(conn, 200, Jason.encode!(%{txid: txid}))
  {:error, _failed_operation, failed_value, _changes_so_far} ->
    # extract the error message from the changeset returned as `failed_value`
    error =
      Ecto.Changeset.traverse_errors(failed_value, fn {msg, opts} ->
        Regex.replace(~r"%{(w+)}", msg, fn _, key ->
          opts |> Keyword.get(String.to_existing_atom(key), key) |> to_string()
        end)
      end)
    Plug.Conn.send_resp(conn, 400, Jason.encode!(error))
  end

Also supports normal fun/0 or fun/1 style transactions much like Ecto.Repo.transaction/2, returning the txid of the operation:

{:ok, txid, todo} =
  Phoenix.Sync.Writer.transaction(fn ->
    Repo.insert!(changeset)
  end, Repo)

txid(changes)

@spec txid(Ecto.Multi.changes()) :: {:ok, txid()} | :error

Extract the transaction id from changes or from a Ecto.Repo within a transaction.

This allows you to use a standard Ecto.Repo.transaction/2 call to apply mutations defined using apply/2 and extract the transaction id afterwards.

Example

{:ok, changes} =
  Phoenix.Sync.Writer.new()
  |> Phoenix.Sync.Writer.allow(MyApp.Todos.Todo)
  |> Phoenix.Sync.Writer.allow(MyApp.Options.Option)
  |> Phoenix.Sync.Writer.to_multi(changes, format: Phoenix.Sync.Writer.Format.TanstackDB)
  |> MyApp.Repo.transaction()

{:ok, txid} = Phoenix.Sync.Writer.txid(changes)

It also allows you to get a transaction id from any active transaction:

MyApp.Repo.transaction(fn ->
  {:ok, txid} = Phoenix.Sync.Writer.txid(MyApp.Repo)
end)

Attempting to run txid/1 on a repo outside a transaction will return an error.

txid!(changes)

@spec txid!(Ecto.Multi.changes()) :: txid()

Returns the a transaction id or raises on an error.

See txid/1.