Pulsar.Schema (Pulsar v2.8.6)

Copy Markdown View Source

Schema definition for Pulsar messages.

Pulsar schemas enable type-safe messaging by defining the structure of message data. When a producer connects with a schema, the broker registers it and returns a schema version that is included in every message.

Supported Types

  • Primitive: :None, :String, :Bool, :Int8, :Int16, :Int32, :Int64, :Float, :Double
  • Structured: :Json, :Avro, :Protobuf, :ProtobufNative, :KeyValue
  • Temporal: :Date, :Time, :Timestamp, :Instant, :LocalDate, :LocalTime, :LocalDateTime

See new/1 for usage examples.

Summary

Functions

Creates a new schema.

Creates a new schema, raising on error.

Converts a schema to the binary protocol format.

Types

schema_type()

@type schema_type() ::
  :AutoConsume
  | :ProtobufNative
  | :LocalDateTime
  | :LocalTime
  | :LocalDate
  | :Instant
  | :KeyValue
  | :Timestamp
  | :Time
  | :Date
  | :Double
  | :Float
  | :Int64
  | :Int32
  | :Int16
  | :Int8
  | :Bool
  | :Avro
  | :Protobuf
  | :Json
  | :String
  | :None

t()

@type t() :: %Pulsar.Schema{
  definition: binary(),
  name: String.t() | nil,
  properties: map() | nil,
  type: schema_type()
}

Functions

new(opts)

@spec new(keyword()) :: {:ok, t()} | {:error, term()}

Creates a new schema.

Options

  • :type - (required) The schema type (e.g., :Json, :String, :Avro)
  • :definition - The schema definition (required for non-primitive types). For :Json and :Avro schemas, can be a struct, map, or binary string. Structs and maps will be automatically JSON-encoded.
  • :name - Optional schema name
  • :properties - Optional metadata properties as a map

Returns

  • {:ok, schema} on success
  • {:error, reason} if validation fails

Examples

# JSON schema (Pulsar uses Avro-style record format)
{:ok, schema} = Pulsar.Schema.new(
  type: :Json,
  definition: %{
    type: "record",
    name: "User",
    fields: [%{name: "id", type: "int"}]
  }
)

# String schema (primitive, no definition needed)
{:ok, schema} = Pulsar.Schema.new(type: :String)

# With properties
{:ok, schema} = Pulsar.Schema.new(
  type: :Avro,
  definition: avro_schema,
  name: "my-schema",
  properties: %{"version" => "1.0"}
)

new!(opts)

@spec new!(keyword()) :: t()

Creates a new schema, raising on error.

Same as new/1 but raises ArgumentError if validation fails.

to_binary(schema)

@spec to_binary(t() | nil) :: Pulsar.Protocol.Binary.Pulsar.Proto.Schema.t() | nil

Converts a schema to the binary protocol format.

Returns nil if the input is nil.