cloudevents v0.4.0 Cloudevents View Source

[![Hex pm](https://img.shields.io/hexpm/v/cloudevents.svg?style=flat-square)](https://hex.pm/packages/cloudevents)[![Hex Docs](https://img.shields.io/badge/api-docs-blue.svg?style=flat-square)](https://hexdocs.pm/cloudevents)[![Apache 2.0 license](https://img.shields.io/hexpm/l/cloudevents.svg?style=flat-square)](./LICENSE)

Cloudevents is an Elixir SDK for consuming and producing CloudEvents with support for various transport protocols and codecs. If you want to learn more about the specification itself, make sure to check out the official spec on GitHub.

Cloudevents is released under the Apache License 2.0 - see the LICENSE file.

## StatusWork in progress.| Spec | Status || :--------------------------- | :-------------------------------- || **Core Specification:** | || CloudEvents | v0.1, v0.2, v1.0 || | || **Optional Specifications:** | || AMQP Protocol Binding | out of scope for now (PR welcome) || AVRO Event Format | todo (PR welcome) || HTTP Protocol Binding | wip (done except batch mode) || JSON Event Format | done || Kafka Protocol Binding | v1.0 || MQTT Protocol Binding | out of scope for now (PR welcome) || NATS Protocol Binding | out of scope for now (PR welcome) || Web hook | out of scope for now (PR welcome) |

Getting started

Add cloudevents to your list of dependencies in mix.exs:

def deps do
  [{:cloudevents, "~> 0.4.0"}]
end

Use Cloudevents.from_map/1 to create your first Cloudevent and see its JSON representation using Cloudevents.to_json/1.

If you're dealing with HTTP requests, Cloudevents.from_http_message/2, Cloudevents.to_http_binary_message/1 and Cloudevents.to_http_structured_message/2 are your friends.

If you need Avro, you need to add Cloudevents to your supervisor:

children = [
  Cloudevents
]

Supervisor.start_link(children, strategy: :one_for_one)

Or start Cloudevents manually:

{:ok, pid} = Cloudevents.start_link()

Link to this section Summary

Types

Time in ms to cache Avro schemas in memory (default 300_000)

Name of the Avro-schema used to encode events

Base path for locally stored schema files (default ./priv/schemas)

Confluent Schema Registry URL for resolving Avro schemas by ID

HTTP body

HTTP headers

Kafka body

Kafka headers

Configuration parameter

Configuration parameters for encoding and decoding of data

t()

Cloudevent

Functions

Returns a specification to start this module under a supervisor.

Decodes an Avro-encoded Cloudevent (requires Cloudevents.start_link/1).

Parses a HTTP request as one or more Cloudevents.

Decodes a JSON-encoded Cloudevent.

Decodes a JSON-encoded Cloudevent and panics otherwise.

Parses a Kafka message as a Cloudevent.

Converts an Elixir map into a Cloudevent.

Converts an Elixir map into a Cloudevent and panics otherwise.

Runs the cloudevents supervisor; needed for Avro support and its schema caching.

Stops the cloudevents supervisor.

Encodes a Cloudevent using Avro binary encoding (requires Cloudevents.start_link/1).

Serialize an event in HTTP binary content mode.

Serialize an event in HTTP structured content mode.

Encodes a Cloudevent using JSON format.

Serialize an event in Kafka binary content mode.

Serialize an event in Kafka structured content mode.

Converts a Cloudevent into an Elixir map. See also Cloudevents.from_map/1.

Link to this section Types

Specs

avro_cache_ttl() :: non_neg_integer()

Time in ms to cache Avro schemas in memory (default 300_000)

Link to this type

avro_event_schema_name()

View Source

Specs

avro_event_schema_name() :: String.t()

Name of the Avro-schema used to encode events

Specs

avro_schemas_path() :: String.t()

Base path for locally stored schema files (default ./priv/schemas)

Link to this type

confluent_schema_registry_url()

View Source

Specs

confluent_schema_registry_url() :: String.t()

Confluent Schema Registry URL for resolving Avro schemas by ID

Specs

http_body() :: binary()

HTTP body

Specs

http_headers() :: [{String.t(), String.t()}]

HTTP headers

Specs

kafka_body() :: binary()

Kafka body

Specs

kafka_headers() :: [{String.t(), String.t()}]

Kafka headers

Specs

option() ::
  {:confluent_schema_registry_url, confluent_schema_registry_url()}
  | {:avro_schemas_path, avro_schemas_path()}
  | {:avro_cache_ttl, avro_cache_ttl()}
  | {:avro_event_schema_name, avro_event_schema_name()}

Configuration parameter

Specs

options() :: [option()]

Configuration parameters for encoding and decoding of data

Specs

Cloudevent

Link to this section Functions

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

from_avro(avro, ctx_attrs)

View Source

Specs

from_avro(avro :: binary(), ctx_attrs :: map()) ::
  {:ok, t()}
  | {:error,
     %Cloudevents.Format.Decoder.DecodeError{
       __exception__: term(),
       cause: term()
     }}

Decodes an Avro-encoded Cloudevent (requires Cloudevents.start_link/1).

TODO: tests/examples

Link to this function

from_http_message(http_body, http_headers)

View Source

Specs

from_http_message(http_body(), http_headers()) :: {:ok, [t()]} | {:error, any()}

Parses a HTTP request as one or more Cloudevents.

Note that the HTTP request may contain more than one event (called a "batch"). Because of this, the function always returns a list of Cloudevents. Use pattern matching if you expect single events only:

with {:ok, [the_event]} = from_http_message(body, headers) do
  "do something with the_event"
else
  {:ok, events} -> "oops got a batch of events"
  {:error, error} -> "failed to parse HTTP request: #{inspect(error)}"
end

Specs

from_json(json :: binary()) ::
  {:ok, t()}
  | {:error,
     %Cloudevents.Format.Decoder.DecodeError{
       __exception__: term(),
       cause: term()
     }}

Decodes a JSON-encoded Cloudevent.

Specs

from_json!(json :: binary()) :: t()

Decodes a JSON-encoded Cloudevent and panics otherwise.

Link to this function

from_kafka_message(kafka_body, kafka_headers)

View Source

Specs

from_kafka_message(kafka_body(), kafka_headers()) ::
  {:ok, t()} | {:error, any()}

Parses a Kafka message as a Cloudevent.

Specs

from_map(map :: %{required(atom()) => any()} | %{required(String.t()) => any()}) ::
  {:ok, t()}
  | {:error,
     %Cloudevents.Format.ParseError{__exception__: term(), message: term()}}

Converts an Elixir map into a Cloudevent.

Examples

iex> Cloudevents.from_map(%{"specversion" => "1.0", "type" => "test", "source" => "test", "id" => "1"})
{:ok,
 %Cloudevents.Format.V_1_0.Event{
   data: nil,
   datacontenttype: nil,
   dataschema: nil,
   extensions: %{},
   id: "1",
   source: "test",
   specversion: "1.0",
   subject: nil,
   time: nil,
   type: "test"
 }}

iex> Cloudevents.from_map(%{specversion: "1.0", type: "test", source: "test", id: "1"})
{:ok,
 %Cloudevents.Format.V_1_0.Event{
   data: nil,
   datacontenttype: nil,
   dataschema: nil,
   extensions: %{},
   id: "1",
   source: "test",
   specversion: "1.0",
   subject: nil,
   time: nil,
   type: "test"
 }}

Specs

from_map!(
  map :: %{required(atom()) => any()} | %{required(String.t()) => any()}
) :: t()

Converts an Elixir map into a Cloudevent and panics otherwise.

Link to this function

handle_call(atom, from, state)

View Source

Specs

start_link(options()) :: {:ok, pid()} | {:error, any()}

Runs the cloudevents supervisor; needed for Avro support and its schema caching.

Stops the cloudevents supervisor.

Specs

to_avro(t()) :: {:ok, binary()} | {:error, term()}

Encodes a Cloudevent using Avro binary encoding (requires Cloudevents.start_link/1).

TODO: tests/examples

Link to this function

to_http_binary_message(event)

View Source

Specs

to_http_binary_message(t()) :: {http_body(), http_headers()}

Serialize an event in HTTP binary content mode.

Binary mode basically means: the payload is in the body and the metadata is in the header.

iex> event = Cloudevents.from_map!(%{
...>   specversion: "1.0",
...>   type: "some-type",
...>   source: "some-source",
...>   id: "1",
...>   data: %{"foo" => "bar"}})
iex> {body, headers} = Cloudevents.to_http_binary_message(event)
{
  "{\"foo\":\"bar\"}",
  [
    {"content-type", "application/json"},
    {"ce-specversion", "1.0"},
    {"ce-type", "some-type"},
    {"ce-source", "some-source"},
    {"ce-id", "1"}
  ]
}
Link to this function

to_http_structured_message(event, event_format)

View Source

Specs

to_http_structured_message(t(), event_format :: :json | :avro_binary) ::
  {:ok, {http_body(), http_headers()}}

Serialize an event in HTTP structured content mode.

Structured mode basically means: the full event - payload and metadata - is in the body.

Specs

to_json(t()) :: binary()

Encodes a Cloudevent using JSON format.

Link to this function

to_kafka_binary_message(event)

View Source

Specs

to_kafka_binary_message(t()) :: {kafka_body(), kafka_headers()}

Serialize an event in Kafka binary content mode.

Binary mode basically means: the payload is in the body and the metadata is in the header.

iex> event = Cloudevents.from_map!(%{
...>   specversion: "1.0",
...>   type: "some-type",
...>   source: "some-source",
...>   id: "1",
...>   data: %{"foo" => "bar"}})
iex> {body, headers} = Cloudevents.to_kafka_binary_message(event)
{
  "{\"foo\":\"bar\"}",
  [
    {"content-type", "application/json"},
    {"ce_specversion", "1.0"},
    {"ce_type", "some-type"},
    {"ce_source", "some-source"},
    {"ce_id", "1"}
  ]
}
Link to this function

to_kafka_structured_message(event, event_format)

View Source

Specs

to_kafka_structured_message(t(), event_format :: :json | :avro_binary) ::
  {:ok, {kafka_body(), kafka_headers()}} | {:error, term()}

Serialize an event in Kafka structured content mode.

Structured mode basically means: the full event - payload and metadata - is in the body.

iex> event = Cloudevents.from_map!(%{
...>   specversion: "1.0",
...>   type: "some-type",
...>   source: "some-source",
...>   id: "1",
...>   data: %{"foo" => "bar"}})
iex> {:ok, {body, headers}} = Cloudevents.to_kafka_structured_message(event, :json)
iex> {body, headers}
{
  "{\"data\":{\"foo\":\"bar\"},\"datacontenttype\":\"application/json\",\"id\":\"1\",\"source\":\"some-source\",\"specversion\":\"1.0\",\"type\":\"some-type\"}",
  [{"content-type", "application/cloudevents+json"}]
}

Note that Avro encoding requires a preceding call to Cloudevents.start_link/1.

Specs

to_map(t()) :: %{required(atom()) => any()}

Converts a Cloudevent into an Elixir map. See also Cloudevents.from_map/1.

Examples

iex> Cloudevents.to_map(Cloudevents.from_map!(%{"specversion" => "1.0", "type" => "test", "source" => "test", "id" => "1"}))
%{specversion: "1.0", type: "test", source: "test", id: "1"}