Rill
Translation of Eventide Framework in Elixir. Please refer to their documentation for learning purposes or finding out arguments of some functions.
Installation
Install via Hex:
def deps do
[
{:rill, ">= 0.0.0"}
]
end
Memory
To use the in-memory MessageStore, Rill.MessageStore.Memory.Server needs to be started and Session needs to be configured with the server pid (or name).
Mnesia (memory)
To use the Mnesia MessageStore, Rill.MessageStore.Mnesia.start() needs to be
called once and Session needs to be configured with a namespace (any string
works). Optionally the session can be initialized using rand/0, which returns
a tuple with {%Session{}, uuid} (uuid is a String)
Ecto.Postgres
To use the Ecto.Postgres MessageStore, ecto and ecto_sql, packages are
needed.
A Ruby installation is needed.
Following the steps for Eventide Postgres Setup, up to Create the Message Store Database (included), will ensure the database is correctly created.
A Repo module needs to be created, following these guidelines:
defmodule MyRepo do
use Ecto.Repo, otp_app: :your_app
@doc """
Dynamically loads the repository url from the
DATABASE_URL environment variable.
"""
def init(_, opts) do
# This part is entirely optional
{:ok, Keyword.put(opts, :url, System.get_env("DATABASE_URL"))}
end
end
The Repo module needs to be started and supplied to Session during
configuration.
Getting Started
Memory
defmodule Renamed do
use Rill, :message
defstruct([:name])
end
{:ok, pid} = Rill.MessageStore.Memory.Server.start_link()
session = Rill.MessageStore.Memory.Session.new(pid)
message = %Renamed{name: "foo"}
Rill.MessageStore.write(session, message, "person")
Mnesia (memory)
defmodule Renamed do
use Rill, :message
defstruct([:name])
end
Rill.MessageStore.Mnesia.start()
namespace = Rill.Identifier.UUID.Random.get()
session = Rill.MessageStore.Mnesia.Session.new(namespace)
# Alternatively:
# session = Rill.MessageStore.Mnesia.Session.rand()
message = %Renamed{name: "foo"}
Rill.MessageStore.write(session, message, "person")
Postgres
defmodule Renamed do
use Rill, :message
defstruct([:name])
end
{:ok, _pid} = MyRepo.start_link([name: MyRepo])
session = Rill.MessageStore.Ecto.Postgres.Session.new(MyRepo)
message = %Renamed{name: "foo"}
Rill.MessageStore.write(session, message, "person")
Framework at a Glance
defmodule Person do
defstruct [:name, :age]
end
defmodule Renamed do
use Rill, :message
defstruct([:name])
end
defmodule Person.Projection do
use Rill, :projection
@impl Rill.EntityProjection
def apply(%Renamed{} = renamed, person) do
Map.put(person, :name, renamed.name)
end
end
defmodule Repo do
use Ecto.Repo,
otp_app: :my_app,
adapter: Ecto.Adapters.Postgres
def init(_, opts) do
{:ok, Keyword.put(opts, :url, System.get_env("DATABASE_URL"))}
end
end
defmodule Store do
use Rill, [
:store,
entity: %Person{},
category: "person",
projection: Person.Projection
]
end
defmodule Handler do
use Rill, :handler
@impl Rill.Messaging.Handler
def handle(%Renamed{} = renamed, _session) do
IO.inspect(renamed)
end
end
defmodule Run do
def run do
{:ok, pid1} = Repo.start_link(name: Repo)
session = Rill.MessageStore.Ecto.Postgres.Session.new(Repo)
renamed = %Renamed{name: "Joe"}
Rill.MessageStore.write(session, renamed, "person-123")
[person, version] = Store.get(session, "123", include: [:version])
person.name # => "Joe"
version # => 0
Supervisor.start_link(
[
{Rill.Consumer,
[
handlers: [Handler],
stream_name: "person",
identifier: "personIdentifier",
session: session,
poll_interval_milliseconds: 10000,
batch_size: 1
]}
],
strategy: :one_for_one
)
:timer.sleep(1500)
# IO.inspect will output `renamed` content
end
end
Features
Read from MessageStore
Reading all messages for a given stream name, can be accomplished with:
Rill.MessageStore.read(session, "streamName") # Returns a stream
A utility is provided for the following common pattern (used in tests):
- Read a message
- Pass the message to a handler
- Repeat the previous 2 steps N times
# Handles all messages from "streamName", 5 times
Rill.MessageStore.Reader.handle(session, "streamName", HandlerModule, 5)
The handle function is provided as utility for testing, it's not meant to
be used in production code. Notice that handle returns the Session, so it
can be piped.
Define a Message
defmodule Renamed do
use Rill, :message
defstruct([:name])
end
message = %Renamed{name: "foo"}
Write to MessageStore
Simple write
Rill.MessageStore.write(session, message, "streamName")
Write with expected version
Rill.MessageStore.write(session, message, "streamName", expected_version: version)
Write initial message
Raises exception if 1+ messages are present already on the stream
Rill.MessageStore.write_initial(session, message, "streamName")
Write one message
Like write_initial but instead of raising, returns nil. If a
message is written, a non-negative integer is returned instead
Rill.MessageStore.write_once(session, message, "streamName")
Write batch of messages to the same stream
Rill.MessageStore.write(session, [msg1, msg2], "streamName", expected_version: version)
Define a Projection
defmodule Person do
defstruct [name: "", age: 0]
end
defmodule Renamed do
use Rill, :message
defstruct([:name])
end
defmodule Person.Projection do
use Rill, :projection
@impl Rill.EntityProjection
# Pattern matching on the first argument it's REQUIRED to determine the
# struct that needs to be used to decode the message coming from the
# MessageStore
def apply(%Renamed{} = renamed, person) do
Map.put(person, :name, renamed.name)
end
end
# A projection is not usually called directly
# Simulates a message coming from the database
message_data = %Rill.MessageStore.MessageData.Read{
data: %{name: "Joe"},
type: "Renamed"
}
person = %Person{name: "Fran", age: 29}
person = Person.Projection.apply(person, message_data)
person.name # => "Joe"
Create a Session
A Rill.Session is just a struct, but there are some utilities available.
Memory
{:ok, pid} = Rill.MessageStore.Memory.Server.start_link()
session = Rill.MessageStore.Memory.Session.new(pid)
Mnesia (memory)
Rill.MessageStore.Mnesia.start()
namespace = Rill.Identifier.UUID.Random.get()
session = Rill.MessageStore.Mnesia.Session.new(namespace)
Alternatively:
Rill.MessageStore.Mnesia.start()
session = Rill.MessageStore.Mnesia.Session.rand()
Postgres
{:ok, _} = Repo.start_link([name: Repo])
session = Rill.MessageStore.Ecto.Postgres.Session.new(repo)
Define a Store
defmodule Store do
use Rill, [
:store
# Initial value for the entity, can be omitted and defaults to `nil`
entity: %Person{},
# Stream name category
category: "person",
# Projection module
projection: Person.Projection
]
end
[person] = Store.fetch("123")
# Including the current version
[person, version] = Store.fetch("123", include: :version) # or [:version]
Define a Handler
defmodule Renamed do
use Rill, :message
defstruct([:name])
end
defmodule Handler do
use Rill, :handler
@impl Rill.Messaging.Handler
# Pattern matching on the first argument it's REQUIRED to determine the
# struct that needs to be used to decode the message coming from the
# MessageStore
def handle(%Renamed{} = renamed, _session) do
IO.inspect(renamed)
IO.puts("hello")
end
end
# A handler is not usually called directly
# Simulates a message coming from the database
message_data = %Rill.MessageStore.MessageData.Read{
data: %{name: "Joe"},
type: "Renamed"
}
Handler.handle(session, message_data)
When a handler is defined using use Rill, :handler, the following utilities
are provided:
try_versionmacro, which rescue anyRill.MessageStore.ExpectedVersion.Errorand returnsnilMessageStoreis available (automatically aliased)Messageis available (automatically aliased)- The
stream_name/1,/2and/3are imported automatically fromRill.MessageStore.StreamName
Start a Consumer
Rill.Consumer.start_link([
# The following arguments are all required
handlers: [Handler],
stream_name: "person",
# Must be supplied to uniquely identify this consumer
identifier: "personIdentifier",
session: session
# Optionally can pass a `condition` argument, handled by the underlying
# MessageStore adapter
])
Alternatively, a Consumer can be started from a Supervisor:
Supervisor.start_link(
[
{Rill.Consumer,
[
handlers: [Handler],
stream_name: "person",
identifier: "personIdentifier",
session: session
]}
],
strategy: :one_for_one
)
TODO
- [x] Logging
- [ ] Tests
- [ ] Test utilities
- [ ] Position Store
- [ ] Store Cache
- [ ] Snapshotting
- [ ] View Data