PhoenixMicro.Schema behaviour (PhoenixMicro v1.0.0)

Copy Markdown View Source

Typed message schema contracts with version negotiation and backward-compatibility enforcement.

Sub-modules:

Defining a schema

defmodule MyApp.Events.PaymentCreated do
  use PhoenixMicro.Schema

  schema_version 2
  topic "payments.created"

  field :payment_id,   :string,  required: true
  field :amount_cents, :integer, required: true
  field :currency,     :string,  required: true, default: "USD"
  field :metadata,     :map,     required: false

  compatible_with [1]

  def migrate(1, payload) do
    {amount, rest} = Map.pop(payload, "amount")
    Map.put(rest, "amount_cents", round(amount * 100))
  end
end

Publishing with schema validation

PhoenixMicro.Schema.publish(MyApp.Events.PaymentCreated, %{
  payment_id: "pay_123",
  amount_cents: 9999,
  currency: "USD"
})

Decoding in consumers

def handle(message, _ctx) do
  {:ok, event} =
    PhoenixMicro.Schema.decode(MyApp.Events.PaymentCreated,
      message.payload, message.headers)
  process(event)
  :ok
end

Summary

Callbacks

Migrates a payload from an older version to the current version.

Validates a raw map against the schema.

Functions

Returns all registered schema modules.

Decodes and validates a raw payload against a schema, migrating if needed.

Validates and publishes a message with schema metadata in headers. Returns {:error, {:schema_validation_failed, errors}} if validation fails.

Returns the schema module registered for a given topic.

Types

field_def()

@type field_def() :: {atom(), field_type(), field_opts()}

field_opts()

@type field_opts() :: [required: boolean(), default: term(), description: String.t()]

field_type()

@type field_type() ::
  :string | :integer | :float | :boolean | :map | :list | :atom | :any

validation_error()

@type validation_error() :: {atom(), String.t()}

Callbacks

compatible_versions()

@callback compatible_versions() :: [pos_integer()]

fields()

@callback fields() :: [field_def()]

migrate(from_version, payload)

(optional)
@callback migrate(from_version :: pos_integer(), payload :: map()) :: map()

Migrates a payload from an older version to the current version.

schema_version()

@callback schema_version() :: pos_integer()

topic()

@callback topic() :: String.t()

validate(map)

@callback validate(map()) :: {:ok, map()} | {:error, [validation_error()]}

Validates a raw map against the schema.

Functions

all_schemas()

@spec all_schemas() :: [module()]

Returns all registered schema modules.

compatible_with(versions)

(macro)

decode(schema_module, payload, headers \\ %{})

@spec decode(module(), map(), map()) :: {:ok, map()} | {:error, term()}

Decodes and validates a raw payload against a schema, migrating if needed.

Pass the message headers so the migrator can detect older payload versions. Returns {:ok, validated_payload} or {:error, reason}.

field(name, type, opts \\ [])

(macro)

publish(schema_module, payload, opts \\ [])

@spec publish(module(), map(), keyword()) :: :ok | {:error, term()}

Validates and publishes a message with schema metadata in headers. Returns {:error, {:schema_validation_failed, errors}} if validation fails.

schema_for_topic(topic)

@spec schema_for_topic(String.t()) :: {:ok, module()} | {:error, :not_found}

Returns the schema module registered for a given topic.

schema_version(v)

(macro)

topic(t)

(macro)