Elixir v1.4.2 Registry View Source

A local, decentralized and scalable key-value process storage.

It allows developers to lookup one or more processes with a given key. If the registry has :unique keys, a key points to 0 or 1 processes. If the registry allows :duplicate keys, a single key may point to any number of processes. In both cases, different keys could identify the same process.

Each entry in the registry is associated to the process that has registered the key. If the process crashes, the keys associated to that process are automatically removed. All key comparisons in the registry are done using the match operation (===).

The registry can be used for different purposes, such as name lookups (using the :via option), storing properties, custom dispatching rules, or a pubsub implementation. We explore some of those use cases below.

The registry may also be transparently partitioned, which provides more scalable behaviour for running registries on highly concurrent environments with thousands or millions of entries.

Using in :via

Once the registry is started with a given name (using Registry.start_link/2), it can be used to register and access named processes using the {:via, Registry, {registry, key}} tuple:

{:ok, _} = Registry.start_link(:unique, Registry.ViaTest)
name = {:via, Registry, {Registry.ViaTest, "agent"}}
{:ok, _} = Agent.start_link(fn -> 0 end, name: name)
Agent.get(name, & &1)
#=> 0
Agent.update(name, & &1 + 1)
Agent.get(name, & &1)
#=> 1

Typically the registry is started as part of a supervision tree though:

supervisor(Registry, [:unique, Registry.ViaTest])

Only registries with unique keys can be used in :via. If the name is already taken, the case-specific start_link function (Agent.start_link/2 in the example above) will return {:error, {:already_started, current_pid}}.

Using as a dispatcher

Registry has a dispatch mechanism that allows developers to implement custom dispatch logic triggered from the caller. For example, let’s say we have a duplicate registry started as so:

{:ok, _} = Registry.start_link(:duplicate, Registry.DispatcherTest)

By calling register/3, different processes can register under a given key and associate any value under that key. In this case, let’s register the current process under the key "hello" and attach the {IO, :inspect} tuple to it:

{:ok, _} = Registry.register(Registry.DispatcherTest, "hello", {IO, :inspect})

Now, an entity interested in dispatching events for a given key may call dispatch/3 passing in the key and a callback. This callback will be invoked with a list of all the values registered under the requested key, alongside the pid of the process that registered each value, in the form of {pid, value} tuples. In our example, value will be the {module, function} tuple in the code above:

Registry.dispatch(Registry.DispatcherTest, "hello", fn entries ->
  for {pid, {module, function}} <- entries, do: apply(module, function, [pid])
end)
# Prints #PID<...> where the pid is for the process that called register/3 above
#=> :ok

Dispatching happens in the process that calls dispatch/3 either serially or concurrently in case of multiple partitions (via spawned tasks). The registered processes are not involved in dispatching unless involving them is done explicitly (for example, by sending them a message in the callback).

Furthermore, if there is a failure when dispatching, due to a bad registration, dispatching will always fail and the registered process will not be notified. Therefore let’s make sure we at least wrap and report those errors:

require Logger
Registry.dispatch(Registry.DispatcherTest, "hello", fn entries ->
  for {pid, {module, function}} <- entries do
    try do
      apply(module, function, [pid])
    catch
      kind, reason ->
        formatted = Exception.format(kind, reason, System.stacktrace)
        Logger.error "Registry.dispatch/3 failed with #{formatted}"
    end
  end
end)
# Prints #PID<...>
#=> :ok

You could also replace the whole apply system by explicitly sending messages. That’s the example we will see next.

Using as a PubSub

Registries can also be used to implement a local, non-distributed, scalable PubSub by relying on the dispatch/3 function, similarly to the previous section: in this case, however, we will send messages to each associated process, instead of invoking a given module-function.

In this example, we will also set the number of partitions to the number of schedulers online, which will make the registry more performant on highly concurrent environments as each partition will spawn a new process, allowing dispatching to happen in parallel:

{:ok, _} = Registry.start_link(:duplicate, Registry.PubSubTest,
                               partitions: System.schedulers_online)
{:ok, _} = Registry.register(Registry.PubSubTest, "hello", [])
Registry.dispatch(Registry.PubSubTest, "hello", fn entries ->
  for {pid, _} <- entries, do: send(pid, {:broadcast, "world"})
end)
#=> :ok

The example above broadcasted the message {:broadcast, "world"} to all processes registered under the “topic” (or “key” as we called it until now) "hello".

The third argument given to register/3 is a value associated to the current process. While in the previous section we used it when dispatching, in this particular example we are not interested in it, so we have set it to an empty list. You could store a more meaningful value if necessary.

Registrations

Looking up, dispatching and registering are efficient and immediate at the cost of delayed unsubscription. For example, if a process crashes, its keys are automatically removed from the registry but the change may not propagate immediately. This means certain operations may return processes that are already dead. When such may happen, it will be explicitly stated in the function documentation.

However, keep in mind those cases are typically not an issue. After all, a process referenced by a pid may crash at any time, including between getting the value from the registry and sending it a message. Many parts of the standard library are designed to cope with that, such as Process.monitor/1 which will deliver the :DOWN message immediately if the monitored process is already dead and Kernel.send/2 which acts as a no-op for dead processes.

ETS

Note that the registry uses one ETS table plus two ETS tables per partition.

Link to this section Summary

Types

The type of keys allowed on registration

The type of the registry

The type of registry metadata keys

The type of registry metadata values

The registry identifier

The type of values allowed on registration

Functions

Invokes the callback with all entries under key in each partition for the given registry

Returns the known keys for the given pid in registry in no particular order

Finds the {pid, value} pair for the given key in registry in no particular order

Returns {pid, value} pairs under the given key in registry that match pattern

Reads registry metadata given on start_link/3

Stores registry metadata

Registers the current process under the given key in registry

Starts the registry as a supervisor process

Unregisters all entries for the given key associated to the current process in registry

Updates the value for key for the current process in the unique registry

Link to this section Types

The type of keys allowed on registration

Link to this type kind() View Source
kind() :: :unique | :duplicate

The type of the registry

Link to this type meta_key() View Source
meta_key() :: atom | tuple

The type of registry metadata keys

Link to this type meta_value() View Source
meta_value() :: term

The type of registry metadata values

Link to this type registry() View Source
registry() :: atom

The registry identifier

Link to this type value() View Source
value() :: term

The type of values allowed on registration

Link to this section Functions

Link to this function dispatch(registry, key, mfa_or_fun) View Source
dispatch(registry, key, (entries :: [{pid, value}] -> term)) :: :ok

Invokes the callback with all entries under key in each partition for the given registry.

The list of entries is a non-empty list of two-element tuples where the first element is the pid and the second element is the value associated to the pid. If there are no entries for the given key, the callback is never invoked.

If the registry is not partitioned, the callback is invoked in the process that calls dispatch/3. If the registry is partitioned, the callback is invoked concurrently per partition by starting a task linked to the caller. The callback, however, is only invoked if there are entries for that partition.

See the module documentation for examples of using the dispatch/3 function for building custom dispatching or a pubsub system.

Link to this function keys(registry, pid) View Source
keys(registry, pid) :: [key]

Returns the known keys for the given pid in registry in no particular order.

If the registry is unique, the keys are unique. Otherwise they may contain duplicates if the process was registered under the same key multiple times. The list will be empty if the process is dead or it has no keys in this registry.

Examples

Registering under a unique registry does not allow multiple entries:

iex> Registry.start_link(:unique, Registry.UniqueKeysTest)
iex> Registry.keys(Registry.UniqueKeysTest, self())
[]
iex> {:ok, _} = Registry.register(Registry.UniqueKeysTest, "hello", :world)
iex> Registry.register(Registry.UniqueKeysTest, "hello", :later) # registry is :unique
{:error, {:already_registered, self()}}
iex> Registry.keys(Registry.UniqueKeysTest, self())
["hello"]

Such is possible for duplicate registries though:

iex> Registry.start_link(:duplicate, Registry.DuplicateKeysTest)
iex> Registry.keys(Registry.DuplicateKeysTest, self())
[]
iex> {:ok, _} = Registry.register(Registry.DuplicateKeysTest, "hello", :world)
iex> {:ok, _} = Registry.register(Registry.DuplicateKeysTest, "hello", :world)
iex> Registry.keys(Registry.DuplicateKeysTest, self())
["hello", "hello"]
Link to this function lookup(registry, key) View Source
lookup(registry, key) :: [{pid, value}]

Finds the {pid, value} pair for the given key in registry in no particular order.

An empty list if there is no match.

For unique registries, a single partition lookup is necessary. For duplicate registries, all partitions must be looked up.

Examples

In the example below we register the current process and look it up both from itself and other processes:

iex> Registry.start_link(:unique, Registry.UniqueLookupTest)
iex> Registry.lookup(Registry.UniqueLookupTest, "hello")
[]
iex> {:ok, _} = Registry.register(Registry.UniqueLookupTest, "hello", :world)
iex> Registry.lookup(Registry.UniqueLookupTest, "hello")
[{self(), :world}]
iex> Task.async(fn -> Registry.lookup(Registry.UniqueLookupTest, "hello") end) |> Task.await
[{self(), :world}]

The same applies to duplicate registries:

iex> Registry.start_link(:duplicate, Registry.DuplicateLookupTest)
iex> Registry.lookup(Registry.DuplicateLookupTest, "hello")
[]
iex> {:ok, _} = Registry.register(Registry.DuplicateLookupTest, "hello", :world)
iex> Registry.lookup(Registry.DuplicateLookupTest, "hello")
[{self(), :world}]
iex> {:ok, _} = Registry.register(Registry.DuplicateLookupTest, "hello", :another)
iex> Enum.sort(Registry.lookup(Registry.DuplicateLookupTest, "hello"))
[{self(), :another}, {self(), :world}]
Link to this function match(registry, key, pattern) View Source
match(registry, key, match_pattern :: atom | tuple) :: [{pid, term}]

Returns {pid, value} pairs under the given key in registry that match pattern.

Pattern must be an atom or a tuple that will match the structure of the value stored in the registry. The atom :_ can be used to ignore a given value or tuple element, while :”$1” can be used to temporarily assign part of pattern to a variable for a subsequent comparison.

An empty list will be returned if there is no match.

For unique registries, a single partition lookup is necessary. For duplicate registries, all partitions must be looked up.

Examples

In the example below we register the current process under the same key in a duplicate registry but with different values:

iex> Registry.start_link(:duplicate, Registry.MatchTest)
iex> {:ok, _} = Registry.register(Registry.MatchTest, "hello", {1, :atom, 1})
iex> {:ok, _} = Registry.register(Registry.MatchTest, "hello", {2, :atom, 2})
iex> Registry.match(Registry.MatchTest, "hello", {1, :_, :_})
[{self(), {1, :atom, 1}}]
iex> Registry.match(Registry.MatchTest, "hello", {2, :_, :_})
[{self(), {2, :atom, 2}}]
iex> Registry.match(Registry.MatchTest, "hello", {:_, :atom, :_}) |> Enum.sort()
[{self(), {1, :atom, 1}}, {self(), {2, :atom, 2}}]
iex> Registry.match(Registry.MatchTest, "hello", {:"$1", :_, :"$1"}) |> Enum.sort()
[{self(), {1, :atom, 1}}, {self(), {2, :atom, 2}}]
Link to this function meta(registry, key) View Source
meta(registry, meta_key) :: {:ok, meta_value} | :error

Reads registry metadata given on start_link/3.

Atoms and tuples are allowed as keys.

Examples

iex> Registry.start_link(:unique, Registry.MetaTest, meta: [custom_key: "custom_value"])
iex> Registry.meta(Registry.MetaTest, :custom_key)
{:ok, "custom_value"}
iex> Registry.meta(Registry.MetaTest, :unknown_key)
:error
Link to this function put_meta(registry, key, value) View Source
put_meta(registry, meta_key, meta_value) :: :ok

Stores registry metadata.

Atoms and tuples are allowed as keys.

Examples

iex> Registry.start_link(:unique, Registry.PutMetaTest)
iex> Registry.put_meta(Registry.PutMetaTest, :custom_key, "custom_value")
:ok
iex> Registry.meta(Registry.PutMetaTest, :custom_key)
{:ok, "custom_value"}
iex> Registry.put_meta(Registry.PutMetaTest, {:tuple, :key}, "tuple_value")
:ok
iex> Registry.meta(Registry.PutMetaTest, {:tuple, :key})
{:ok, "tuple_value"}
Link to this function register(registry, key, value) View Source
register(registry, key, value) ::
  {:ok, pid} |
  {:error, {:already_registered, pid}}

Registers the current process under the given key in registry.

A value to be associated with this registration must also be given. This value will be retrieved whenever dispatching or doing a key lookup.

This function returns {:ok, owner} or {:error, reason}. The owner is the pid is the registry partition responsible for the pid. The owner is automatically linked to the caller.

If the registry has unique keys, it will return {:ok, owner} unless the key is already associated to a pid, in which case it returns {:error, {:already_registered, pid}}.

If the registry has duplicate keys, multiple registrations from the current process under the same key are allowed.

Examples

Registering under a unique registry does not allow multiple entries:

iex> Registry.start_link(:unique, Registry.UniqueRegisterTest)
iex> {:ok, _} = Registry.register(Registry.UniqueRegisterTest, "hello", :world)
iex> Registry.register(Registry.UniqueRegisterTest, "hello", :later)
{:error, {:already_registered, self()}}
iex> Registry.keys(Registry.UniqueRegisterTest, self())
["hello"]

Such is possible for duplicate registries though:

iex> Registry.start_link(:duplicate, Registry.DuplicateRegisterTest)
iex> {:ok, _} = Registry.register(Registry.DuplicateRegisterTest, "hello", :world)
iex> {:ok, _} = Registry.register(Registry.DuplicateRegisterTest, "hello", :world)
iex> Registry.keys(Registry.DuplicateRegisterTest, self())
["hello", "hello"]
Link to this function start_link(kind, registry, options \\ []) View Source
start_link(kind, registry, options) ::
  {:ok, pid} |
  {:error, term} when options: [partitions: pos_integer, listeners: [atom], meta: [{meta_key, meta_value}]]

Starts the registry as a supervisor process.

Manually it can be started as:

Registry.start_link(:unique, MyApp.Registry)

In your supervisor tree, you would write:

supervisor(Registry, [:unique, MyApp.Registry])

For intensive workloads, the registry may also be partitioned (by specifying the :partitions option). If partitioning is required then a good default is to set the number of partitions to the number of schedulers available:

Registry.start_link(:unique, MyApp.Registry, partitions: System.schedulers_online())

or:

supervisor(Registry, [:unique, MyApp.Registry, [partitions: System.schedulers_online()]])

Options

The registry supports the following options:

  • :partitions - the number of partitions in the registry. Defaults to 1.
  • :listeners - a list of named processes which are notified of :register and :unregister events. The registered process must be monitored by the listener if the listener wants to be notified if the registered process crashes.
  • :meta - a keyword list of metadata to be attached to the registry.
Link to this function unregister(registry, key) View Source
unregister(registry, key) :: :ok

Unregisters all entries for the given key associated to the current process in registry.

Always returns :ok and automatically unlinks the current process from the owner if there are no more keys associated to the current process. See also register/3 to read more about the “owner”.

Examples

It unregister all entries for key for unique registries:

iex> Registry.start_link(:unique, Registry.UniqueUnregisterTest)
iex> Registry.register(Registry.UniqueUnregisterTest, "hello", :world)
iex> Registry.keys(Registry.UniqueUnregisterTest, self())
["hello"]
iex> Registry.unregister(Registry.UniqueUnregisterTest, "hello")
:ok
iex> Registry.keys(Registry.UniqueUnregisterTest, self())
[]

As well as duplicate registries:

iex> Registry.start_link(:duplicate, Registry.DuplicateUnregisterTest)
iex> Registry.register(Registry.DuplicateUnregisterTest, "hello", :world)
iex> Registry.register(Registry.DuplicateUnregisterTest, "hello", :world)
iex> Registry.keys(Registry.DuplicateUnregisterTest, self())
["hello", "hello"]
iex> Registry.unregister(Registry.DuplicateUnregisterTest, "hello")
:ok
iex> Registry.keys(Registry.DuplicateUnregisterTest, self())
[]
Link to this function update_value(registry, key, callback) View Source
update_value(registry, key, (value -> value)) ::
  {new_value :: term, old_value :: term} |
  :error

Updates the value for key for the current process in the unique registry.

Returns a {new_value, old_value} tuple or :error if there is no such key assigned to the current process.

If a non-unique registry is given, an error is raised.

Examples

iex> Registry.start_link(:unique, Registry.UpdateTest)
iex> {:ok, _} = Registry.register(Registry.UpdateTest, "hello", 1)
iex> Registry.lookup(Registry.UpdateTest, "hello")
[{self(), 1}]
iex> Registry.update_value(Registry.UpdateTest, "hello", & &1 + 1)
{2, 1}
iex> Registry.lookup(Registry.UpdateTest, "hello")
[{self(), 2}]