DistributedSupervisor (Distr…visor v0.5.5)

View Source

DistributedSupervisor is a specialized dynamic supervisor designed to work transparently in distributed Erlang/Elixir environments. It extends the functionality of Elixir's built-in DynamicSupervisor by providing automatic process distribution across connected nodes.

Architecture and Process Distribution

The supervisor leverages several components to achieve distributed process management:

  • :pg for distributed process groups and registry
  • HashRing (via libring) for consistent hash-based process distribution
  • Local DynamicSupervisor instances on each node
  • Internal process registry for child tracking

When a child process is started through DistributedSupervisor:

  1. A target node is selected (using consistent hashing or explicit assignment)
  2. The process is started on the designated node via the local DynamicSupervisor
  3. Process information is registered in the internal registry
  4. Child processes can be monitored for failures and redistributed if needed

Supervision Strategy

DistributedSupervisor uses a one-for-one supervision strategy for child processes. Each child is supervised independently, and if a child process terminates, only that process is restarted according to its restart strategy (:permanent, :transient, or :temporary).

In a distributed context:

  • When a node goes down, child processes can be restarted on other nodes (if cache_children? is enabled)
  • When a new node joins, it becomes available for process distribution
  • Process redistribution is handled automatically via a consistent hashing algorithm

Basic Usage

Starting a DistributedSupervisor

# Start a distributed supervisor with minimal configuration
{:ok, pid} = DistributedSupervisor.start_link(name: MyApp.DistSup)

# Start with more options
{:ok, pid} = DistributedSupervisor.start_link(
  name: MyApp.DistSup,
  cache_children?: true,
  monitor_nodes: true
)

Starting Child Processes

# Start a child process (will be distributed automatically)
{:ok, pid, ref} = DistributedSupervisor.start_child(MyApp.DistSup, MyWorker)

# Start a child with a specific name
{:ok, pid, MyWorker} = DistributedSupervisor.start_child(
  MyApp.DistSup,
  {MyWorker, name: MyWorker}
)

# Start a child on a specific node
{:ok, pid, name} = DistributedSupervisor.start_child(
  MyApp.DistSup,
  MyWorker,
  :"node1@host"
)

Interacting with Children

# Call a specific child process
result = DistributedSupervisor.call(MyApp.DistSup, MyWorker, :get_status)

# Send a cast to a child process
:ok = DistributedSupervisor.cast(MyApp.DistSup, MyWorker, {:update, value})

# Send a direct message
DistributedSupervisor.send(MyApp.DistSup, MyWorker, {:ping, self()})

# Get information about all children
children = DistributedSupervisor.children(MyApp.DistSup)
# => %{worker_name => {pid, child_spec}, ...}

# Find a specific child's PID
pid = DistributedSupervisor.whereis(MyApp.DistSup, MyWorker)

# Find a child's name from its PID
name = DistributedSupervisor.whois(MyApp.DistSup, pid)

# Terminate a child
:ok = DistributedSupervisor.terminate_child(MyApp.DistSup, pid)

Configuration Options

The DistributedSupervisor accepts the following configuration options:

Required Options

  • :name - The unique name for this distributed supervisor instance
    DistributedSupervisor.start_link(name: MyApp.DistSup)

Optional Configuration

  • :cache_children? (default: true) - Whether to cache children information

    When enabled, the supervisor maintains a cache of child processes, allowing them to be restarted on other nodes if their original node goes down. Disabling this option reduces memory usage but removes the ability to recover processes after node failures.

    # Enable child caching (better fault tolerance)
    DistributedSupervisor.start_link(
      name: MyApp.DistSup,
      cache_children?: true
    )
  • :nodes (default: []) - Specific nodes to distribute processes across

    By default, all connected nodes are used. Specifying a subset can improve performance and predictability.

    # Use only specific nodes
    DistributedSupervisor.start_link(
      name: MyApp.DistSup,
      nodes: [:node1@host, :node2@host]
    )
  • :monitor_nodes (default: false) - Monitor node connections/disconnections

    When set to true, the hash ring will be automatically updated when nodes join or leave the cluster.

    # Automatically adapt to cluster changes
    DistributedSupervisor.start_link(
      name: MyApp.DistSup,
      monitor_nodes: true
    )
  • :listeners (default: []) - Modules implementing the DistributedSupervisor.Listener behaviour

    Listeners receive notifications about process and node lifecycle events.

    # Single listener
    DistributedSupervisor.start_link(
      name: MyApp.DistSup,
      listeners: MyApp.ProcessListener
    )
    
    # Multiple listeners
    DistributedSupervisor.start_link(
      name: MyApp.DistSup,
      listeners: [MyApp.ProcessListener, MyApp.MetricsListener]
    )

Advanced Features and Usage

Process Distribution Control

Check process distribution information:

# Get all nodes in the distribution ring
nodes = DistributedSupervisor.nodes(MyApp.DistSup)

# Determine which node would handle a specific key
node = DistributedSupervisor.node_for(MyApp.DistSup, :some_key)

# Check if current node owns a specific key
is_mine = DistributedSupervisor.mine?(MyApp.DistSup, :some_key)

Custom Process Names and Via Tuples

Use custom names for processes and interact with them using standard GenServer functions:

# Get a via tuple for use with standard GenServer functions
via = DistributedSupervisor.via_name(MyApp.DistSup, MyWorker)

# Use with standard GenServer calls
GenServer.call(via, :message)
GenServer.cast(via, :message)

Process Restart Strategies

Configure how processes behave on termination:

# Permanent process (restarted for any reason)
DistributedSupervisor.start_child(
  MyApp.DistSup,
  {MyWorker, name: :worker1, restart: :permanent}
)

# Transient process (restarted only on abnormal termination)
DistributedSupervisor.start_child(
  MyApp.DistSup,
  {MyWorker, name: :worker2, restart: :transient}
)

# Temporary process (never restarted)
DistributedSupervisor.start_child(
  MyApp.DistSup,
  {MyWorker, name: :worker3, restart: :temporary}
)

Summary

Types

The id of the child process within the instance of the distributed supervisor. Might be whatever. If not passed explicitly to the DistributedSupervisor.start_child/2, the reference will be created automatically and returned as a third element of the {:ok, pid, child_name} success tuple.

The name of the instance of this distributed supervisor. Unlike GenServer.name/0, it must be an atom.

Functions

A syntactic sugar for GenServer.call/3 allowing to call a dynamically supervised GenServer by registry name and key.

A syntactic sugar for GenServer.cast/2 allowing to call a dynamically supervised GenServer by registry name and key.

Returns a specification to start this module under a supervisor.

Returns a map with registered names as keys and pids as values for the instance of the registry with a name name.

Returns a list of pids of local children

Returns true if called from a node assigned to this key, false otherwise

Returns the node for the key given according to a HashRing

Returns the list of nodes operated by a registered ring

A syntactic sugar for Kernel.send/2 allowing to send a message to a dynamically supervised GenServer identified by registry name and key.

Dynamically adds a child specification to supervisor and starts that child.

Terminates the given child identified by pid.

Returns a fully qualified name to use with a standard library functions, accepting {:via, Registry, key} as a GenServer name.

Returns a t:pid() for the instance of the registry with a name name by key.

Returns a registered name by a t:pid() given.

Types

id()

@type id() :: term()

The id of the child process within the instance of the distributed supervisor. Might be whatever. If not passed explicitly to the DistributedSupervisor.start_child/2, the reference will be created automatically and returned as a third element of the {:ok, pid, child_name} success tuple.

name()

@type name() :: atom()

The name of the instance of this distributed supervisor. Unlike GenServer.name/0, it must be an atom.

Functions

call(name, child, msg, timeout \\ 5000)

@spec call(name(), id(), msg, timeout()) :: result when msg: term(), result: term()

A syntactic sugar for GenServer.call/3 allowing to call a dynamically supervised GenServer by registry name and key.

Examples

iex> {:ok, _sup} = DistributedSupervisor.start_link(name: ExampleSup11)
iex> {:ok, _pid, :call_test} = DistributedSupervisor.start_child(ExampleSup11, {DistributedSupervisor.Test.GenServer, name: :call_test})
iex> result = DistributedSupervisor.call(ExampleSup11, :call_test, :state)
iex> match?({{_, _}, _}, result)
true

cast(name, child, msg)

@spec cast(name(), id(), msg) :: :ok when msg: term()

A syntactic sugar for GenServer.cast/2 allowing to call a dynamically supervised GenServer by registry name and key.

Examples

iex> {:ok, _sup} = DistributedSupervisor.start_link(name: ExampleSup12)
iex> {:ok, _pid, :cast_test} = DistributedSupervisor.start_child(ExampleSup12, {DistributedSupervisor.Test.GenServer, name: :cast_test})
iex> DistributedSupervisor.cast(ExampleSup12, :cast_test, :inc)
:ok

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

children(name)

@spec children(name()) ::
  %{optional(term()) => {pid(), Supervisor.child_spec()}}
  | [
      {:undefined, pid() | :restarting, :worker | :supervisor,
       [module()] | :dynamic}
    ]

Returns a map with registered names as keys and pids as values for the instance of the registry with a name name.

Examples

iex> {:ok, _sup} = DistributedSupervisor.start_link(name: ExampleSup8)
iex> {:ok, _pid, :child1} = DistributedSupervisor.start_child(ExampleSup8, {DistributedSupervisor.Test.GenServer, name: :child1})
iex> {:ok, _pid, :child2} = DistributedSupervisor.start_child(ExampleSup8, {DistributedSupervisor.Test.GenServer, name: :child2})
iex> children = DistributedSupervisor.children(ExampleSup8)
iex> is_map(children) and map_size(children) >= 2
true
iex> Map.has_key?(children, :child1) and Map.has_key?(children, :child2)
true

local_children(name)

@spec local_children(name()) :: [pid()]

Returns a list of pids of local children

mine?(name, key)

Returns true if called from a node assigned to this key, false otherwise

node_for(name, key)

Returns the node for the key given according to a HashRing

nodes(name)

Returns the list of nodes operated by a registered ring

send(name, child, msg)

@spec send(name(), id(), msg) :: msg | nil when msg: term()

A syntactic sugar for Kernel.send/2 allowing to send a message to a dynamically supervised GenServer identified by registry name and key.

Returns the message if the process exists, nil otherwise.

Examples

iex> {:ok, _sup} = DistributedSupervisor.start_link(name: ExampleSup13)
iex> {:ok, _pid, :send_test} = DistributedSupervisor.start_child(ExampleSup13, {DistributedSupervisor.Test.GenServer, name: :send_test})
iex> DistributedSupervisor.send(ExampleSup13, :send_test, :test_message)
:test_message
iex> DistributedSupervisor.send(ExampleSup13, :nonexistent, :test_message)
:undefined

start_child(name, spec, node \\ nil)

@spec start_child(name(), Supervisor.child_spec() | {module(), term()}, node() | nil) ::
  DynamicSupervisor.on_start_child()

Dynamically adds a child specification to supervisor and starts that child.

child_spec should be a valid child specification as detailed in the "Child specification" section of the documentation for Supervisor expressed as a Supervisor.child_spec/0 or as a tuple {module, start_link_arg}.

The child process will be started as defined in the child specification. The core difference from DynamicSupervisor is that the process must be named. The name might be any term, passed through name: option in a call to this function. If name option is not passed, it gets assigned randomly and returned in the third element of the tuple from start_child/2.

This function accepts an optional third argument node. If it's passed, the process will be started on that node; the node will be chosed according to a keyring otherwise.

Examples

# Start a simple child process with module only (auto-generated name)
iex> {:ok, _sup} = DistributedSupervisor.start_link(name: ExampleSup3)
iex> {:ok, pid, child_name} = DistributedSupervisor.start_child(ExampleSup3, DistributedSupervisor.Test.GenServer)
iex> is_pid(pid) and is_reference(child_name)
true

# Start with tuple format and explicit name
iex> {:ok, _sup} = DistributedSupervisor.start_link(name: ExampleSup4)
iex> {:ok, pid, NamedWorker} = DistributedSupervisor.start_child(
...>   ExampleSup4,
...>   {DistributedSupervisor.Test.GenServer, name: NamedWorker}
...> )
iex> is_pid(pid)
true

# Start with explicit child specification
iex> {:ok, _sup} = DistributedSupervisor.start_link(name: ExampleSup5)
iex> child_spec = %{
...>   id: :test_child,
...>   start: {DistributedSupervisor.Test.GenServer, :start_link, name: SpecWorker}
...> }
iex> {:ok, pid, SpecWorker} = DistributedSupervisor.start_child(ExampleSup5, child_spec)
iex> is_pid(pid)
true

See: DynamicSupervisor.start_child/2

start_link(opts \\ [])

Starts the DistributedSupervisor.

  • :name (atom/0) - Required. The unique ID of this DistributedSupervisor, that will be used to address it, similar to DynamicSupervisor.name()

  • :cache_children? (boolean/0) - If true, Registry will cache children as a map of %{name => %{pid() => initial_params}}, setting this to false would block the functionañity of restarting a process on another node when any node goes down The default value is true.

  • :nodes (list of atom/0) - The hardcoded list of nodes to spread children across, if not passed, all connected nodes will be used The default value is [].

  • :monitor_nodes (atom/0) - If not false, the HashRing will be automatically updated when nodes are changed in the cluster The default value is false.

  • :listeners - The implementation of DistributedSupervisor.Listener to receive notifications upon process restarts The default value is [].

Examples

# Start with minimal required configuration (just the name)
iex> {:ok, pid} = DistributedSupervisor.start_link(name: ExampleSup1)
iex> is_pid(pid)
true

# Start with additional options
iex> {:ok, pid} = DistributedSupervisor.start_link(
...>   name: ExampleSup2,
...>   cache_children?: true,
...>   monitor_nodes: true,
...>   nodes: [node()]
...> )
iex> is_pid(pid)
true

terminate_child(name, pid)

@spec terminate_child(name(), pid() | nil) :: :ok | {:error, :not_found}

Terminates the given child identified by pid.

If successful, this function returns :ok. If there is no process with the given PID, this function returns {:error, :not_found}.

Examples

# Start and then terminate a child process
iex> {:ok, _sup} = DistributedSupervisor.start_link(name: ExampleSup6)
iex> {:ok, pid, _} = DistributedSupervisor.start_child(ExampleSup6, DistributedSupervisor.Test.GenServer)
iex> DistributedSupervisor.terminate_child(ExampleSup6, pid)
:ok

# Try to terminate a non-existent PID
iex> {:ok, _sup} = DistributedSupervisor.start_link(name: ExampleSup7)
iex> DistributedSupervisor.terminate_child(ExampleSup7, nil)
{:error, :not_found}

See: DynamicSupervisor.terminate_child/2

via_name(name, id)

@spec via_name(name(), id()) :: {:via, module(), {name(), id()}}

Returns a fully qualified name to use with a standard library functions, accepting {:via, Registry, key} as a GenServer name.

whereis(name, child)

@spec whereis(name(), id()) :: pid() | nil

Returns a t:pid() for the instance of the registry with a name name by key.

Examples

iex> {:ok, _sup} = DistributedSupervisor.start_link(name: ExampleSup9)
iex> {:ok, pid1, :lookup_test} = DistributedSupervisor.start_child(ExampleSup9, {DistributedSupervisor.Test.GenServer, name: :lookup_test})
iex> DistributedSupervisor.whereis(ExampleSup9, :lookup_test) == pid1
true
iex> DistributedSupervisor.whereis(ExampleSup9, :nonexistent_process)
:undefined

See: DistributedSupervisor.children/1

whois(name, pid)

@spec whois(name(), pid()) :: name() | nil

Returns a registered name by a t:pid() given.

Examples

iex> {:ok, _sup} = DistributedSupervisor.start_link(name: ExampleSup10)
iex> {:ok, pid, :whois_test} = DistributedSupervisor.start_child(ExampleSup10, {DistributedSupervisor.Test.GenServer, name: :whois_test})
iex> DistributedSupervisor.whois(ExampleSup10, pid)
:whois_test
iex> non_existing_pid = :erlang.list_to_pid(~c"<0.999.0>")
iex> DistributedSupervisor.whois(ExampleSup10, non_existing_pid)
nil

See: DistributedSupervisor.children/1