DoubleEntryLedger.Occ.Processor behaviour (double_entry_ledger v0.1.0)

View Source

Behavior and default implementation for Optimistic Concurrency Control (OCC) in command processing.

This module provides:

  • A behaviour defining four callbacks:
    • build_transaction/3
    • handle_build_transaction/3
    • handle_transaction_map_error/3
    • handle_occ_final_timeout/2
  • A process_with_retry/2 implementation that:
    • Converts command data to a transaction map
    • Builds an Ecto.Multi via build_multi/3
    • Retries on Ecto.StaleEntryError with exponential backoff
    • Calls handle_occ_final_timeout/2 when retries are exhausted
  • Helper imports for backoff, error tracking, and scheduling.

Usage

defmodule MyCommandProcessor do
  use DoubleEntryLedger.Occ.Processor

  @impl true
  def build_transaction(command, tx_map, repo) do
    Ecto.Multi.new()
    |> Ecto.Multi.insert(:transaction, Transaction.changeset(%Transaction{}, tx_map))
  end

  @impl true
  def handle_build_transaction(multi, command, _repo), do: multi

  @impl true
  def handle_transaction_map_error(command, error, _repo) do
    Ecto.Multi.new()
    |> Ecto.Multi.update(
      :event_failure,
      Command.changeset(command, %{status: :failed, errors: [inspect(error)]})
    )
  end

  @impl true
  def handle_occ_final_timeout(command, _repo) do
    Ecto.Multi.new()
    |> Ecto.Multi.update(
      :event_dead_letter,
      Command.changeset(command, %{status: :dead_letter})
    )
  end
end

Summary

Callbacks

Builds an Ecto.Multi transaction for processing a command.

Allows further customization of the Ecto.Multi after the base transaction steps.

Handles the case when OCC retries are exhausted.

Handles errors that occur when converting command data to a transaction map.

Process the command with retry mechanisms.

Callbacks

build_transaction(t, transaction_map, t, t)

Builds an Ecto.Multi transaction for processing a command.

This callback must be implemented by modules using the OccProcessor behavior. It defines how to construct the database transaction operations needed to process the command and its associated transaction data.

Required Transaction Steps

The Multi must include specific named steps depending on the input type:

  • :create_command (required for TransactionCommandMap) - Must return the created Command struct when processing the TransactionCommandMap
  • :transaction (required) - Must return the saved Transaction struct and it must handle the Ecto.StaleEntryError and return it as the error for the Multi.failure()
  • :event (required) - Must return the saved Command struct when processing the Command

Parameters

  • occable_item: A Command struct or TransactionCommandMap containing the command details to process
  • transaction_map: A map of transaction data derived from the command map
  • repo: The Ecto repository to use for database operations

Returns

  • An Ecto.Multi struct containing all the operations to execute atomically

handle_build_transaction(t, t, t)

@callback handle_build_transaction(
  Ecto.Multi.t(),
  DoubleEntryLedger.Occ.Occable.t(),
  Ecto.Repo.t()
) ::
  Ecto.Multi.t()

Allows further customization of the Ecto.Multi after the base transaction steps.

This callback can be used to add additional steps or modify the Multi before execution.

Parameters

  • multi: The Ecto.Multi built by build_transaction/3
  • occable_item: The command or command map being processed
  • repo: The Ecto repository

Returns

handle_occ_final_timeout(t, t)

@callback handle_occ_final_timeout(
  DoubleEntryLedger.Occ.Occable.t(),
  Ecto.Repo.t()
) :: Ecto.Multi.t()

Handles the case when OCC retries are exhausted.

This callback should return an Ecto.Multi that marks the command as permanently failed.

Parameters

  • occable_item: The command or command map being processed
  • repo: The Ecto repository

Returns

  • An Ecto.Multi that updates the command as dead letter or timed out

handle_transaction_map_error(t, any, t)

@callback handle_transaction_map_error(
  DoubleEntryLedger.Occ.Occable.t(),
  any(),
  Ecto.Repo.t()
) :: Ecto.Multi.t()

Handles errors that occur when converting command data to a transaction map.

This callback should return an Ecto.Multi that updates the Command to reflect the error.

Parameters

  • occable_item: The command or command map being processed
  • error: The error encountered during transaction map conversion
  • repo: The Ecto repository

Returns

  • An Ecto.Multi that updates the command with error information

process_with_retry(t, t)

@callback process_with_retry(DoubleEntryLedger.Occ.Occable.t(), Ecto.Repo.t()) ::
  {:ok,
   %{
     transaction: DoubleEntryLedger.Transaction.t(),
     event_success: DoubleEntryLedger.Command.t()
   }}
  | {:ok, %{event_failure: DoubleEntryLedger.Command.t()}}
  | Ecto.Multi.failure()

Process the command with retry mechanisms.

This callback has a default implementation through the using macro.

process_with_retry_no_save_on_error(t, t)

@callback process_with_retry_no_save_on_error(
  DoubleEntryLedger.Occ.Occable.t(),
  Ecto.Repo.t()
) ::
  {:ok,
   %{
     transaction: DoubleEntryLedger.Transaction.t(),
     event_success: DoubleEntryLedger.Command.t()
   }}
  | Ecto.Multi.failure()