DoubleEntryLedger.Occ.Processor behaviour (double_entry_ledger v0.1.0)
View SourceBehavior and default implementation for Optimistic Concurrency Control (OCC) in command processing.
This module provides:
- A behaviour defining four callbacks:
build_transaction/3handle_build_transaction/3handle_transaction_map_error/3handle_occ_final_timeout/2
- A
process_with_retry/2implementation that:- Converts command data to a transaction map
- Builds an Ecto.Multi via
build_multi/3 - Retries on
Ecto.StaleEntryErrorwith exponential backoff - Calls
handle_occ_final_timeout/2when retries are exhausted
- Helper imports for backoff, error tracking, and scheduling.
Usage
defmodule MyCommandProcessor do
use DoubleEntryLedger.Occ.Processor
@impl true
def build_transaction(command, tx_map, repo) do
Ecto.Multi.new()
|> Ecto.Multi.insert(:transaction, Transaction.changeset(%Transaction{}, tx_map))
end
@impl true
def handle_build_transaction(multi, command, _repo), do: multi
@impl true
def handle_transaction_map_error(command, error, _repo) do
Ecto.Multi.new()
|> Ecto.Multi.update(
:event_failure,
Command.changeset(command, %{status: :failed, errors: [inspect(error)]})
)
end
@impl true
def handle_occ_final_timeout(command, _repo) do
Ecto.Multi.new()
|> Ecto.Multi.update(
:event_dead_letter,
Command.changeset(command, %{status: :dead_letter})
)
end
end
Summary
Callbacks
Builds an Ecto.Multi transaction for processing a command.
Allows further customization of the Ecto.Multi after the base transaction steps.
Handles the case when OCC retries are exhausted.
Handles errors that occur when converting command data to a transaction map.
Process the command with retry mechanisms.
Callbacks
@callback build_transaction( DoubleEntryLedger.Occ.Occable.t(), DoubleEntryLedger.Workers.CommandWorker.TransactionCommandTransformer.transaction_map(), Ecto.UUID.t(), Ecto.Repo.t() ) :: Ecto.Multi.t()
Builds an Ecto.Multi transaction for processing a command.
This callback must be implemented by modules using the OccProcessor behavior. It defines how to construct the database transaction operations needed to process the command and its associated transaction data.
Required Transaction Steps
The Multi must include specific named steps depending on the input type:
:create_command(required for TransactionCommandMap) - Must return the created Command struct when processing the TransactionCommandMap:transaction(required) - Must return the saved Transaction struct and it must handle the Ecto.StaleEntryError and return it as the error for the Multi.failure():event(required) - Must return the saved Command struct when processing the Command
Parameters
occable_item: A Command struct or TransactionCommandMap containing the command details to processtransaction_map: A map of transaction data derived from the command maprepo: The Ecto repository to use for database operations
Returns
- An
Ecto.Multistruct containing all the operations to execute atomically
@callback handle_build_transaction( Ecto.Multi.t(), DoubleEntryLedger.Occ.Occable.t(), Ecto.Repo.t() ) :: Ecto.Multi.t()
Allows further customization of the Ecto.Multi after the base transaction steps.
This callback can be used to add additional steps or modify the Multi before execution.
Parameters
multi: The Ecto.Multi built bybuild_transaction/3occable_item: The command or command map being processedrepo: The Ecto repository
Returns
- An updated
Ecto.Multi
@callback handle_occ_final_timeout( DoubleEntryLedger.Occ.Occable.t(), Ecto.Repo.t() ) :: Ecto.Multi.t()
Handles the case when OCC retries are exhausted.
This callback should return an Ecto.Multi that marks the command as permanently failed.
Parameters
occable_item: The command or command map being processedrepo: The Ecto repository
Returns
- An
Ecto.Multithat updates the command as dead letter or timed out
@callback handle_transaction_map_error( DoubleEntryLedger.Occ.Occable.t(), any(), Ecto.Repo.t() ) :: Ecto.Multi.t()
Handles errors that occur when converting command data to a transaction map.
This callback should return an Ecto.Multi that updates the Command to reflect the error.
Parameters
occable_item: The command or command map being processederror: The error encountered during transaction map conversionrepo: The Ecto repository
Returns
- An
Ecto.Multithat updates the command with error information
@callback process_with_retry(DoubleEntryLedger.Occ.Occable.t(), Ecto.Repo.t()) :: {:ok, %{ transaction: DoubleEntryLedger.Transaction.t(), event_success: DoubleEntryLedger.Command.t() }} | {:ok, %{event_failure: DoubleEntryLedger.Command.t()}} | Ecto.Multi.failure()
Process the command with retry mechanisms.
This callback has a default implementation through the using macro.
@callback process_with_retry_no_save_on_error( DoubleEntryLedger.Occ.Occable.t(), Ecto.Repo.t() ) :: {:ok, %{ transaction: DoubleEntryLedger.Transaction.t(), event_success: DoubleEntryLedger.Command.t() }} | Ecto.Multi.failure()