Version migration engine for PhoenixMicro.Schema.
When a consumer receives a message tagged with an older schema version,
the Migrator chains the schema module's migrate/2 callbacks to bring
the payload up to the current version.
How it works
Given:
payload_version = 1current_version = 3compatible_versions = [1, 2]
The migrator calls:
payload
|> schema.migrate(1, _) # 1 → 2
|> schema.migrate(2, _) # 2 → 3Each migrate/2 implementation only needs to handle a single hop.
Example
defmodule MyApp.Events.PaymentCreated do
use PhoenixMicro.Schema
schema_version 3
topic "payments.created"
field :payment_id, :string, required: true
field :amount_cents, :integer, required: true
field :currency, :string, required: true, default: "USD"
# Tell the system which older versions we can handle
compatible_with [1, 2]
# v1 stored amount in dollars — convert to cents
def migrate(1, payload) do
Map.update(payload, "amount", 0, fn a -> round(a * 100) end)
|> Map.put_new("currency", "USD")
end
# v2 renamed payment_ref → payment_id
def migrate(2, payload) do
{ref, rest} = Map.pop(payload, "payment_ref")
Map.put(rest, "payment_id", ref)
end
end
Summary
Functions
Migrates a payload from from_version to the schema's current version.
Extracts the schema version from message headers.
Returns nil if no version header is present.
Functions
@spec migrate(module(), pos_integer(), map()) :: {:ok, map()} | {:error, term()}
Migrates a payload from from_version to the schema's current version.
Returns {:ok, migrated_payload} or {:error, reason}.
@spec version_from_headers(map()) :: pos_integer() | nil
Extracts the schema version from message headers.
Returns nil if no version header is present.