yggdrasil v3.2.2 Yggdrasil View Source
Yggdrasil is an immense mythical tree that connects the nine worlds in Norse cosmology.
Yggdrasil manages subscriptions to channels/queues in several brokers with
the possibility to add more. Simple Redis, RabbitMQ and PostgreSQL adapters
are implemented. Message passing is done through Phoenix.PubSub adapters.
Yggdrasil also manages publishing pools to Redis, RabbitMQ and PostgreSQL
using :poolboy library.
This library provides three functions i.e:
To subscribe:
Yggdrasil.subscribe(Yggdrasil.Channel.t()) :: :okTo unsubscribe:
Yggdrasil.unsubscribe(Yggdrasil.Channel.t()) :: :okTo publish:
Yggdrasil.publish(Yggdrasil.Channel.t(), message()) :: :ok when message: term
On subscription, the client receives a message with the following structure:
{:Y_CONNECTED, Yggdrasil.Channel.t()}
and when the server publishes a message to the channel the message received by the clients has the following structure:
{:Y_EVENT, Yggdrasil.Channel.t(), message()} when message: term()
The channels can be defined with the structure Yggdrasil.Channel.t(), e.g:
channel = %Yggdrasil.Channel{
name: "redis_channel",
transformer: Yggdrasil.Transformer.Default,
adapter: :redis,
namespace: TestRedis
}
where:
name- It’s the name of the channel. ForRedisadapter, the name of the channel is a string.transformer- Module that contains the functionsdecode/2andencode/2to transform the messages. The defaulttransformerYggdrasil.Default.Transformermodule does nothing to the messages.adapter- It’s the adapter to be used for subscriptions and/or publishing. In this case:redisis the adapter used both for subscriptions and publishing.namespace: The namespace for the adapter configuration to allow several adapter configurations run at once depending on the needs of the system. By default isYggdrasile.g:use Mix.Config config :yggdrasil, redis: [hostname: "localhost"]but for
TestRedisnamespace would be like this:use Mix.Config config: :yggdrasil, TestRedis, redis: [hostname: "localhost"]
Small Example
The following example uses the Elixir distribution to send the messages. It is equivalent to the Redis, RabbitMQ and Postgres distributions when a connection configuration is supplied:
iex(1)> Yggdrasil.subscribe(%Yggdrasil.Channel{name: "channel"})
iex(2)> flush()
{:Y_CONNECTED, %Yggdrasil.Channel{name: "channel", (...)}}
and to publish a for the subscribers:
iex(3)> Yggdrasil.publish(%Yggdrasil.Channel{name: "channel"}, "message")
iex(4)> flush()
{:Y_EVENT, %Yggdrasil.Channel{name: "channel", (...)}, "message"}
Adapters
The provided subscription adapters are the following (all are equivalents):
- For
Elixir:Yggdrasil.Subscriber.Adapter.ElixirandYggdrasil.Distributor.Adapter.Elixir. - For
Redis:Yggdrasil.Subscriber.Adapter.RedisandYggdrasil.Distributor.Adapter.Redis. - For
RabbitMQ:Yggdrasil.Subscriber.Adapter.RabbitMQandYggdrasil.Distributor.Adapter.RabbitMQ. - For
Postgres:Yggdrasil.Subscriber.Adapter.PostgresandYggdrasil.Distributor.Adapter.Postgres.
The provided publisher adapters are the following:
- For
Elixir:Yggdrasil.Publisher.Adapter.Elixir. - For
Redis:Yggdrasil.Publisher.Adapter.Redis. - For
RabbitMQ:Yggdrasil.Publisher.Adapter.RabbitMQ. - For
Postgres:Yggdrasil.Publisher.Adapter.Postgres.
Also there is possible to use hibrid adapters. They work both subscriptions and publishing (recommended):
- For
Elixir::elixirandYggdrasil.Elixir. - For
Redis::redisandYggdrasil.Redis. - For
RabbitMQ::rabbitmqandYggdrasil.RabbitMQ. - For
Postgres::postgresandYggdrasil.Postgres.
The format for the channels for every adapter is the following:
- For
Elixiradapter: any valid term. - For
Redisadapter: a string. - For
RabbitMQadapter: a tuple with exchange and routing key. - For
Postgresadapter: a string.
Custom Transformers
The only available transformer module is Yggdrasil.Transformer.Default and
does nothing to the messages. It’s possible to define custom transformers to
decode and encode messages from and to the brokers respectively e.g:
defmodule QuoteTransformer do
use Yggdrasil.Transformer
alias Yggdrasil.Channel
def decode(%Channel{} = _channel, message) do
with {:ok, quoted} <- Code.string_to_quoted(message),
{encoded, _} <- Code.eval_quoted(quoted),
do: {:ok, encoded}
end
def encode(%Channel{} = _channel, data) do
encoded = inspect(data)
{:ok, encoded}
end
end
and using the RabbitMQ adapter the channel would be:
iex(1)> channel = %Yggdrasil.Channel{
...(1)> name: {"amq.topic", "quotes"},
...(1)> adapter: :rabbitmq,
...(1)> transformer: QuoteTransformer
...(1)> }
where the name of the channel for this adapter is a tuple with the
exchange name and the routing key. Then:
iex(2)> Yggdrasil.subscribe(channel)
:ok
iex(3)> flush()
{:Y_CONNECTED, %Yggdrasil.Channel{} = _channel}
iex(4)> Yggdrasil.publish(channel, %{"answer" => 42})
:ok
iex(5)> flush()
{:Y_EVENT, %Yggdrasil.Channel{} = _channel, %{"answer" => 42}}
Configuration
There are several configuration options and all of them should be in the
general configuration for Yggdrasil:
pubsub_adapter-Phoenix.PubSubadapter. By defaultPhoenix.PubSub.PG2is used.pubsub_name- Name of thePhoenix.PubSubadapter. By default isYggdrasil.PubSub.pubsub_options- Options of thePhoenix.PubSubadapter. By default are[pool_size: 1].publisher_options-Poolboyoptions for publishing. By default are[size: 5, max_overflow: 10].registry- Process name registry. By default is usedExReg.
Also it can be specified the connection configurations with or without namespace:
redis- List of options ofRedix:hostname- Redis hostname.port- Redis port.password- Redis password.
rabbitmq- List of options ofAMQP:hostname- RabbitMQ hostname.port- RabbitMQ port.username- Username.password- Password.virtual_host- Virtual host.subscriber_options-poolboyoptions for RabbitMQ subscriber. By default are[size: 5, max_overflow: 10].
postgres- List of options ofPostgrex:hostname- Postgres hostname.port- Postgres port.username- Postgres username.password- Postgres password.database- Postgres database name.
For more information about configuration using OS environment variables look
at Yggdrasil.Settings.
Link to this section Summary
Link to this section Functions
publish(Yggdrasil.Channel.t, term) :: :ok | {:error, term}
Publishes a message in a channel.
subscribe(Yggdrasil.Channel.t) :: :ok | {:error, term}
Subscribes to a channel.
unsubscribe(Yggdrasil.Channel.t) :: :ok | {:error, term}
Unsubscribes from a channel.