View Source Cloudevents (cloudevents v0.6.1)

[![Hex pm](https://img.shields.io/hexpm/v/cloudevents.svg?style=flat-square)](https://hex.pm/packages/cloudevents) [![Hex Docs](https://img.shields.io/badge/api-docs-blue.svg?style=flat-square)](https://hexdocs.pm/cloudevents) [![Apache 2.0 license](https://img.shields.io/hexpm/l/cloudevents.svg?style=flat-square)](./LICENSE)

Cloudevents is an Elixir SDK for consuming and producing CloudEvents with support for various transport protocols and codecs. If you want to learn more about the specification itself, make sure to check out the official spec on GitHub.

Cloudevents is released under the Apache License 2.0 - see the LICENSE file.

## Supported versions * OTP 25.0 and later * Elixir 1.13 and later ## Status | Spec | Status | | :--------------------------- | :-------------------------------- | | **Core Specification:** | | | CloudEvents | v0.1, v0.2, v1.0 | | | | | **Optional Specifications:** | | | AMQP Protocol Binding | out of scope for now (PR welcome) | | AVRO Event Format | todo (PR welcome) | | HTTP Protocol Binding | wip (done except batch mode) | | JSON Event Format | done | | Kafka Protocol Binding | v1.0 | | MQTT Protocol Binding | out of scope for now (PR welcome) | | NATS Protocol Binding | out of scope for now (PR welcome) | | Web hook | out of scope for now (PR welcome) |

getting-started

Getting started

Add cloudevents to your list of dependencies in mix.exs:

def deps do
  [{:cloudevents, "~> 0.6.1"}]
end

Use Cloudevents.from_map/1 to create your first Cloudevent and see its JSON representation using Cloudevents.to_json/1.

If you're dealing with HTTP requests, Cloudevents.from_http_message/2, Cloudevents.to_http_binary_message/1 and Cloudevents.to_http_structured_message/2 are your friends.

If you need Avro, you need to add avrora to your dependency list:

def deps do
  [{:avrora, "~> 0.21"}]
end

Then, you need to add Cloudevents to your supervisor:

children = [
  Cloudevents
]

Supervisor.start_link(children, strategy: :one_for_one)

Or start Cloudevents manually:

{:ok, pid} = Cloudevents.start_link([])

Link to this section Summary

Types

Time in ms to cache Avro schemas in memory (default 300_000)

Name of the Avro-schema used to encode events

Base path for locally stored schema files (default ./priv/schemas)

Confluent Schema Registry URL for resolving Avro schemas by ID

HTTP body

HTTP headers

Kafka body

Kafka headers

Configuration parameter

Configuration parameters for encoding and decoding of data

t()

Cloudevent

Functions

Returns a specification to start this module under a supervisor.

Decodes an Avro-encoded Cloudevent (requires Cloudevents.start_link/1).

Parses a HTTP request as one or more Cloudevents.

Decodes a JSON-encoded Cloudevent and panics otherwise.

Decodes a JSON-encoded Cloudevent.

Parses a Kafka message as a Cloudevent.

Converts an Elixir map into a Cloudevent and panics otherwise.

Converts an Elixir map into a Cloudevent.

Runs the cloudevents supervisor; needed for Avro support and its schema caching.

Stops the cloudevents supervisor.

Encodes a Cloudevent using Avro binary encoding (requires Cloudevents.start_link/1).

Serialize an event in HTTP binary content mode.

Serialize an event in HTTP structured content mode.

Encodes a Cloudevent using JSON format.

Serialize an event in Kafka binary content mode.

Serialize an event in Kafka structured content mode.

Converts a Cloudevent into an Elixir map. See also Cloudevents.from_map/1.

Link to this section Types

@type avro_cache_ttl() :: non_neg_integer()

Time in ms to cache Avro schemas in memory (default 300_000)

Link to this type

avro_event_schema_name()

View Source
@type avro_event_schema_name() :: String.t()

Name of the Avro-schema used to encode events

@type avro_schemas_path() :: String.t()

Base path for locally stored schema files (default ./priv/schemas)

Link to this type

confluent_schema_registry_url()

View Source
@type confluent_schema_registry_url() :: String.t()

Confluent Schema Registry URL for resolving Avro schemas by ID

@type http_body() :: binary()

HTTP body

@type http_headers() :: [{String.t(), String.t()}]

HTTP headers

@type kafka_body() :: binary()

Kafka body

@type kafka_headers() :: [{String.t(), String.t()}]

Kafka headers

@type option() ::
  {:confluent_schema_registry_url, confluent_schema_registry_url()}
  | {:avro_schemas_path, avro_schemas_path()}
  | {:avro_cache_ttl, avro_cache_ttl()}
  | {:avro_event_schema_name, avro_event_schema_name()}

Configuration parameter

@type options() :: [option()]

Configuration parameters for encoding and decoding of data

Cloudevent

Link to this section Functions

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

from_avro(avro, ctx_attrs)

View Source
@spec from_avro(avro :: binary(), ctx_attrs :: map()) ::
  {:ok, t()}
  | {:error,
     %Cloudevents.Format.Decoder.DecodeError{
       __exception__: term(),
       cause: term()
     }}

Decodes an Avro-encoded Cloudevent (requires Cloudevents.start_link/1).

TODO: tests/examples

Link to this function

from_http_message(http_body, http_headers)

View Source
@spec from_http_message(http_body(), http_headers()) :: {:ok, [t()]} | {:error, any()}

Parses a HTTP request as one or more Cloudevents.

Note that the HTTP request may contain more than one event (called a "batch"). Because of this, the function always returns a list of Cloudevents. Use pattern matching if you expect single events only:

with {:ok, [the_event]} = from_http_message(body, headers) do
  "do something with the_event"
else
  {:ok, events} -> "oops got a batch of events"
  {:error, error} -> "failed to parse HTTP request: #{inspect(error)}"
end
@spec from_json!(json :: binary()) :: t()

Decodes a JSON-encoded Cloudevent and panics otherwise.

@spec from_json(json :: binary()) ::
  {:ok, t()}
  | {:error,
     %Cloudevents.Format.Decoder.DecodeError{
       __exception__: term(),
       cause: term()
     }}

Decodes a JSON-encoded Cloudevent.

Link to this function

from_kafka_message(kafka_body, kafka_headers)

View Source
@spec from_kafka_message(kafka_body(), kafka_headers()) ::
  {:ok, t()} | {:error, any()}

Parses a Kafka message as a Cloudevent.

@spec from_map!(
  map :: %{required(atom()) => any()} | %{required(String.t()) => any()}
) :: t()

Converts an Elixir map into a Cloudevent and panics otherwise.

@spec from_map(map :: %{required(atom()) => any()} | %{required(String.t()) => any()}) ::
  {:ok, t()}
  | {:error,
     %Cloudevents.Format.ParseError{__exception__: term(), message: term()}}

Converts an Elixir map into a Cloudevent.

examples

Examples

iex> Cloudevents.from_map(%{"specversion" => "1.0", "type" => "test", "source" => "test", "id" => "1"})
{:ok,
 %Cloudevents.Format.V_1_0.Event{
   data: nil,
   datacontenttype: nil,
   dataschema: nil,
   extensions: %{},
   id: "1",
   source: "test",
   specversion: "1.0",
   subject: nil,
   time: nil,
   type: "test"
 }}

iex> Cloudevents.from_map(%{specversion: "1.0", type: "test", source: "test", id: "1"})
{:ok,
 %Cloudevents.Format.V_1_0.Event{
   data: nil,
   datacontenttype: nil,
   dataschema: nil,
   extensions: %{},
   id: "1",
   source: "test",
   specversion: "1.0",
   subject: nil,
   time: nil,
   type: "test"
 }}
Link to this function

handle_call(atom, from, state)

View Source
@spec start_link(options()) :: {:ok, pid()} | {:error, any()}

Runs the cloudevents supervisor; needed for Avro support and its schema caching.

Stops the cloudevents supervisor.

@spec to_avro(t()) :: {:ok, binary()} | {:error, term()}

Encodes a Cloudevent using Avro binary encoding (requires Cloudevents.start_link/1).

TODO: tests/examples

Link to this function

to_http_binary_message(event)

View Source
@spec to_http_binary_message(t()) :: {http_body(), http_headers()}

Serialize an event in HTTP binary content mode.

Binary mode basically means: the payload is in the body and the metadata is in the header.

iex> event = Cloudevents.from_map!(%{
...>   specversion: "1.0",
...>   type: "some-type",
...>   source: "some-source",
...>   id: "1",
...>   data: %{"foo" => "bar"}})
iex> {_body, _headers} = Cloudevents.to_http_binary_message(event)
{
  "{\"foo\":\"bar\"}",
  [
    {"content-type", "application/json"},
    {"ce-specversion", "1.0"},
    {"ce-type", "some-type"},
    {"ce-source", "some-source"},
    {"ce-id", "1"}
  ]
}
Link to this function

to_http_structured_message(event, event_format)

View Source
@spec to_http_structured_message(t(), event_format :: :json | :avro_binary) ::
  {:ok, {http_body(), http_headers()}}

Serialize an event in HTTP structured content mode.

Structured mode basically means: the full event - payload and metadata - is in the body.

@spec to_json(t()) :: binary()

Encodes a Cloudevent using JSON format.

Link to this function

to_kafka_binary_message(event)

View Source
@spec to_kafka_binary_message(t()) :: {kafka_body(), kafka_headers()}

Serialize an event in Kafka binary content mode.

Binary mode basically means: the payload is in the body and the metadata is in the header.

iex> event = Cloudevents.from_map!(%{
...>   specversion: "1.0",
...>   type: "some-type",
...>   source: "some-source",
...>   id: "1",
...>   data: %{"foo" => "bar"}})
iex> {_body, _headers} = Cloudevents.to_kafka_binary_message(event)
{
  "{\"foo\":\"bar\"}",
  [
    {"content-type", "application/json"},
    {"ce_specversion", "1.0"},
    {"ce_type", "some-type"},
    {"ce_source", "some-source"},
    {"ce_id", "1"}
  ]
}
Link to this function

to_kafka_structured_message(event, event_format)

View Source
@spec to_kafka_structured_message(t(), event_format :: :json | :avro_binary) ::
  {:ok, {kafka_body(), kafka_headers()}} | {:error, term()}

Serialize an event in Kafka structured content mode.

Structured mode basically means: the full event - payload and metadata - is in the body.

iex> event = Cloudevents.from_map!(%{
...>   specversion: "1.0",
...>   type: "some-type",
...>   source: "some-source",
...>   id: "1",
...>   data: %{"foo" => "bar"}})
iex> {:ok, {body, headers}} = Cloudevents.to_kafka_structured_message(event, :json)
iex> {body, headers}
{
  "{\"data\":{\"foo\":\"bar\"},\"datacontenttype\":\"application/json\",\"id\":\"1\",\"source\":\"some-source\",\"specversion\":\"1.0\",\"type\":\"some-type\"}",
  [{"content-type", "application/cloudevents+json"}]
}

Note that Avro encoding requires a preceding call to Cloudevents.start_link/1.

@spec to_map(t()) :: %{required(atom()) => any()}

Converts a Cloudevent into an Elixir map. See also Cloudevents.from_map/1.

examples

Examples

iex> Cloudevents.to_map(Cloudevents.from_map!(%{"specversion" => "1.0", "type" => "test", "source" => "test", "id" => "1"}))
%{specversion: "1.0", type: "test", source: "test", id: "1"}