ConfluentSchema (confluent_schema v0.1.3)

Provides cache and validation for confluent schemas.

Returns a Supervisor.child_spec() for server to periodically fetch and cache Confluent schemas.

Validates the payload against the schema for the given subject. If you need to validate against multiple schema registries, use the cache_name option.

@type errors() :: [{message :: binary(), path :: binary()}]

@spec child_spec(Keyword.t()) :: Supervisor.child_spec()

You can get credentials from Confluent Cloud: Login > Home > Environments. Or you can also spin off your own Confluent Schema Registry server.



  • period - Period in milliseconds to update schemas (optional integer, default 5 minutes)
  • debug - Enable debug logs (optional boolean, default false)
  • name - Name of the GenServer and ETS cache, (optional atom)
    Name is useful when multiple schema registries are needed.
    Default names are ConfluentSchema.Server for the GenServer and ConfluentSchema.Cache for the ETS table.


ConfluentSchemaRegistry options

  • base_url - URL of schema registry (optional, default "http://localhost:8081")
  • username - username or api key (optional)
  • password - password or api secret (optional)
  • adapter - Tesla Adapter (optional, default Tesla.Adapter.Hackney)
  • middleware - List of Tesla middlewares (optional)
  • local - Whether to use local schemas from priv/confluent_schema/*.json
    Optional, default false. When set all above options are ignored.
  • app_name - Name of the application where local schemas are
    App name is required when local is true



 opts = [
  name: :my_custom_registry,
  period: :timer.minutes(5),
  debug: false,
  base_url: "http://localhost:8081",
  username: "key",
  password: "api secret",
  adapter: {Tesla.Adapter.Hackney, hackney_opts},
  middleware: []

children = [{ConfluentSchema, opts}]
Supervisor.start_link(children, strategy: :one_for_one)
validate(payload, subject, cache_name \\ ConfluentSchema.Cache)

@spec validate(map(), binary(), atom()) ::
  {:ok, map()} | {:error, :not_found} | {:error, errors()} | no_return()

iex> ConfluentSchema.validate("a string example", "subject")
{:ok, "a string example"}

iex> ConfluentSchema.validate("a string example", "unknown-subject")
{:error, :not_found}

iex> ConfluentSchema.validate(123, "subject")
{:error, [{"Type mismatch. Expected String but got Integer.", "#"}]}