system_registry v0.8.2 SystemRegistry

SystemRegistry is a transactional nested term storage and dispatch system. It takes a different approach to a typical publish-subscribe pattern by focusing on data instead of events. SystemRegistry is local (as opposed to distributed) and transactional (as opposed to asynchronous) to eliminate race conditions. It also supports eventual consistency with rate-limiting consumers that control how often they receive state updates.

Data in SystemRegistry is stored as a tree of nodes, represented by a nested map. In order to perform operations on the registry data, you specify the scope of the operation as a list of keys to walk to the desired tree node.

Link to this section Summary

Functions

Commit a transaction. Attempts to apply all changes. If successful, will notify_all

Execute an transaction to delete keys and their values

Delete all keys owned by the calling process

Query the SystemRegistry using a match spec

Move a node from one scope to another

Register process to receive notifications. Registrants are rate-limited and require that you pass an interval. Upon registration, the caller will receive the current state

Returns a transaction struct to pass to update/3 and delete/4 to chain modifications to to group. Prevents notifying registrants for each action. Example

Unregister process from receiving notifications

Unregister process from receiving notifications

Execute an transaction to insert or modify data

Execute an transaction to modify data in place by passing a modifier function

Link to this section Types

Link to this type

scope()
scope() :: [term()]

Link to this section Functions

Link to this function

commit(transaction)
commit(SystemRegistry.Transaction.t()) ::
  {:ok, {new :: map(), old :: map()}} | {:error, term()}

Commit a transaction. Attempts to apply all changes. If successful, will notify_all.

Link to this function

delete(t, scope \\ nil)
delete(transaction, scope()) :: SystemRegistry.Transaction.t()
when transaction: SystemRegistry.Transaction.t()
delete(scope(), Keyword.t() | nil) ::
  {:ok, {new :: map(), old :: map()}} | {:error, term()}

Execute an transaction to delete keys and their values.

Delete can be called on its own:

iex> SystemRegistry.update([:a], 1)
{:ok, {%{a: 1}, %{}}}
iex> SystemRegistry.delete([:a])
{:ok, {%{}, %{a: 1}}}

Or it can be included as part of a transaction pipeline

iex> SystemRegistry.update([:a], 1)
{:ok, {%{a: 1}, %{}}}
iex> SystemRegistry.transaction |> SystemRegistry.delete([:a]) |> SystemRegistry.commit
{:ok, {%{}, %{a: 1}}}

If you pass an internal node to delete, it will delete all the keys the process ownes under it.

iex> SystemRegistry.update([:a, :b], 1)
{:ok, {%{a: %{b: 1}}, %{}}}
iex> SystemRegistry.delete([:a])
{:ok, {%{}, %{a: %{b: 1}}}}
Link to this function

delete_all(pid \\ nil)
delete_all(pid() | nil) ::
  {:ok, {new :: map(), old :: map()}} | {:error, term()}

Delete all keys owned by the calling process.

iex> SystemRegistry.update([:a, :b], 1)
{:ok, {%{a: %{b: 1}}, %{}}}
iex> SystemRegistry.delete_all()
{:ok, {%{}, %{a: %{b: 1}}}}
Link to this function

match(key \\ :global, match_spec)
match(key :: term(), match_spec :: term()) :: map()

Query the SystemRegistry using a match spec.

iex> SystemRegistry.update([:a, :b], 1)
{:ok, {%{a: %{b: 1}}, %{}}}
iex> SystemRegistry.match(self(), :_)
%{a: %{b: 1}}
iex> SystemRegistry.match(self(), %{a: %{}})
%{a: %{b: 1}}
iex> SystemRegistry.match(self(), %{a: %{b: 2}})
%{}
Link to this function

move(t, old_scope, new_scope \\ nil)
move(transaction, scope(), scope()) :: SystemRegistry.Transaction.t()
when transaction: SystemRegistry.Transaction.t()
move(scope_arg, scope(), opts :: nil | Keyword.t()) ::
  {:ok, {new :: map(), old :: map()}} | {:error, term()}
when scope_arg: scope()

Move a node from one scope to another

Move can be called on its own:

iex> SystemRegistry.update([:a], 1)
{:ok, {%{a: 1}, %{}}}
iex> SystemRegistry.move([:a], [:b])
{:ok, {%{b: 1}, %{a: 1}}}

Or it can be included as part of a transaction pipeline

iex> SystemRegistry.update([:a], 1)
{:ok, {%{a: 1}, %{}}}
iex> SystemRegistry.transaction |> SystemRegistry.move([:a], [:b]) |> SystemRegistry.commit
{:ok, {%{b: 1}, %{a: 1}}}
Link to this function

register(opts \\ [])
register(opts :: keyword()) :: :ok | {:error, term()}

Register process to receive notifications. Registrants are rate-limited and require that you pass an interval. Upon registration, the caller will receive the current state.

options

* `:hysteresis` - Default: 0, The amount of time to wait before delivering the first
  change message.
* `:min_interval` - Default: 0, The minimum amount of time to wait after hysteresis,
  but before the next message is to be delivered.

With both options defaulting to , you will receive every message.

Examples

iex> mailbox = fn ->
...>   receive do
...>     msg -> msg
...>   after
...>     5 -> nil
...>   end
...> end
iex> SystemRegistry.register()
:ok
iex> mailbox.()
{:system_registry, :global, %{}}
iex> SystemRegistry.update([:state, :a], 1)
{:ok, {%{state: %{a: 1}}, %{}}}
iex> :timer.sleep(50)
:ok
iex> mailbox.()
{:system_registry, :global, %{state: %{a: 1}}}
iex> SystemRegistry.unregister()
:ok
iex> mailbox.()
nil
iex> SystemRegistry.delete_all()
{:ok, {%{}, %{state: %{a: 1}}}}
iex> SystemRegistry.register(hysteresis: 10, min_interval: 50)
:ok
iex> mailbox.()
iex> SystemRegistry.update([:state, :a], 2)
{:ok, {%{state: %{a: 2}}, %{}}}
iex> :timer.sleep(1)
iex> mailbox.()
nil
iex> :timer.sleep(15)
iex> mailbox.()
{:system_registry, :global, %{state: %{a: 2}}}
iex> SystemRegistry.update([:state, :a], 3)
{:ok, {%{state: %{a: 3}}, %{state: %{a: 2}}}}
iex> mailbox.()
nil
iex> :timer.sleep(50)
:ok
iex> mailbox.()
{:system_registry, :global, %{state: %{a: 3}}}
Link to this function

transaction(opts \\ [])
transaction(opts :: Keyword.t()) :: SystemRegistry.Transaction.t()

Returns a transaction struct to pass to update/3 and delete/4 to chain modifications to to group. Prevents notifying registrants for each action. Example:

iex> SystemRegistry.transaction |> SystemRegistry.update([:a], 1) |> SystemRegistry.commit
{:ok, {%{a: 1}, %{}}}
Link to this function

unregister(key \\ :global)
unregister(key :: term()) :: :ok | {:error, term()}

Unregister process from receiving notifications

Link to this function

unregister_all(pid \\ nil)
unregister_all(pid() | nil) :: :ok | {:error, term()}

Unregister process from receiving notifications

Link to this function

update(t, scope, value \\ nil)
update(one, scope(), value :: any()) :: SystemRegistry.Transaction.t()
when one: SystemRegistry.Transaction.t()
update(one, value :: any(), opts :: nil | Keyword.t()) ::
  {:ok, {new :: map(), old :: map()}} | {:error, term()}
when one: scope()

Execute an transaction to insert or modify data.

Update can be called on its own:

iex> SystemRegistry.update([:a], 1)
{:ok, {%{a: 1}, %{}}}

Or it can be included as part of a transaction pipeline

iex> SystemRegistry.transaction |> SystemRegistry.update([:a], 1) |> SystemRegistry.commit
{:ok, {%{a: 1}, %{}}}

Passing a map to update will recursively expand into a transaction for example this:

iex> SystemRegistry.update([:a], %{b: 1})
{:ok, {%{a: %{b: 1}}, %{}}}

is equivalent to this:

iex>  SystemRegistry.update([:a, :b], 1)
{:ok, {%{a: %{b: 1}}, %{}}}
Link to this function

update_in(scope, fun, opts \\ [])
update_in(scope(), (term() -> term()), opts :: keyword()) ::
  {:ok, {new :: map(), old :: map()}} | {:error, term()}

Execute an transaction to modify data in place by passing a modifier function.

Allows for the manipulation of the value at the scope. Useful for when the value needs to be modified in place.

For Example:

iex> SystemRegistry.update([:a], [1])
{:ok, {%{a: [1]}, %{}}}
iex> SystemRegistry.update_in([:a], fn(value) -> [2 | value] end)
{:ok, {%{a: [2, 1]}, %{a: [1]}}}