Asynchronous Processing with the Command Queue
View SourceDoubleEntryLedger submits work to an immutable Command table and processes it through a built-in command queue. You decide whether to wait for the projection to finish (CommandApi.process_from_params/2) or store the command and let the queue finish it in the background (CommandApi.create_from_params/1). This guide focuses on the asynchronous path.
How the queue is organized
- Command submission: Commands are written through
DoubleEntryLedger.Apis.CommandApi. Each command carries aCommandQueueItemrecord with status:pending,:processing,:processed,:failed,:occ_timeout, or:dead_letter. - Supervision:
DoubleEntryLedger.CommandQueue.Supervisorstarts the scheduler stack (registry, dynamic supervisors, and workers).InstanceMonitorpolls for instances with pending commands and ensures each has anInstanceProcessor. - Processing: An
InstanceProcessorclaims commands via optimistic locking (CommandQueue.Scheduling.claim_command_for_processing/2), invokes the appropriate worker module (create/update transaction or account), and writes the resultingJournalEvent, transactions, entries, balance history, and Oban link jobs. - Retries: Failures trigger exponential backoff (configurable). Workers distinguish validation failures (marked as dead letters) from transient OCC or database errors (scheduled for retry). Exhausted retries land in
:dead_letterfor manual inspection.
Submitting commands asynchronously
Use the same request payload you would send synchronously but call CommandApi.create_from_params/1. The command is persisted, assigned a queue item, and returned immediately.
alias DoubleEntryLedger.Apis.CommandApi
command = %{
"instance_address" => instance.address,
"action" => "create_transaction",
"source" => "billing",
"source_idempk" => "async-payment-1",
"payload" => %{
status: :posted,
entries: [
%{"account_address" => cash.address, "amount" => 1_000_00, "currency" => :USD},
%{"account_address" => revenue.address, "amount" => 1_000_00, "currency" => :USD}
]
}
}
{:ok, processed_command} = CommandApi.create_from_params(command)
processed_command.command_queue_item.status
# => :pendingAt this point the command is durable, but the associated transaction and journal event do not exist yet.
Monitoring processing
InstanceMonitor continuously scans for pending commands and spins up processors per instance. Processors transition commands through statuses:
:pending→:processingwhen the worker claims the command.:processing→:processedwhen projections succeed.:processing→:failed,:occ_timeout, or:dead_letterwhen something goes wrong.
Use DoubleEntryLedger.Stores.CommandStore to inspect queue progress:
alias DoubleEntryLedger.Stores.CommandStore
command = CommandStore.get_by_id(command.id)
command.command_queue_item.status
CommandStore.list_all_for_instance(instance.id, page: 1, per_page: 20)When you need the resulting transaction or account, wait until the CommandQueueItem shows :processed, then query the projections normally (e.g., TransactionStore.get_by_id/1, AccountStore.get_by_address/2, or JournalEventStore helpers).
Configuration knobs
Tuning happens under the :command_queue config namespace (kept for backwards compatibility):
config :double_entry_ledger, :command_queue,
poll_interval: 5_000,
max_retries: 5,
base_retry_delay: 30,
max_retry_delay: 3_600,
processor_name: "command_queue"poll_interval– how oftenInstanceMonitorlooks for pending work.max_retries,base_retry_delay,max_retry_delay– OCC/backoff behaviour.processor_name– used in queue item metadata to identify workers.
Oban configuration lives separately in config :double_entry_ledger, Oban, ... and controls how many link jobs run concurrently.
Error handling and retries
- Validation errors (bad payloads, missing accounts, unbalanced entries) mark the command as
:dead_letterwith the reason recorded on the queue item. They are not retried. - Optimistic concurrency conflicts (stale account/transaction rows) mark the queue item as
:occ_timeoutwhich is retried automatically. - Unexpected exceptions mark the queue item as
:failedand are retried using exponential backoff untilmax_retriesis reached. - Manual intervention: Inspect the recorded
errorsarray onCommandQueueItemor thePendingTransactionLookuptable when updates fail because the original transaction is still pending.
Summary
- Queue commands via
CommandApi.create_from_params/1; each command is immutable and idempotent. CommandQueueItemtracks the background lifecycle; workers process commands per instance with OCC and retries.- Monitor queue state through
CommandStoreand read projections through the existing stores once the queue item reaches:processed. - Tune throughput and retry behaviour via the
:command_queueconfig and Oban settings.
For more details, explore: