View Source ProcessHub (ProcessHub v0.3.2-alpha)

This is the main public API module for the ProcessHub library and it is recommended to use only the functions defined in this module to interact with the ProcessHub library.

ProcessHub is a library that distributes processes within the BEAM cluster. It is designed to be used as a building block for distributed applications that require process distribution and synchronization.

Summary

Types

The child_id defines the name of the child. It is used to identify the child. Each child must have a unique child_id() in the cluster. A single child may have multiple pid()s across the cluster.

The child_spec defines the specification of a child process.

The hub_id defines the name of the hub. It is used to identify the hub.

The init_opts() defines the options that can be passed to the start_children/3, start_child/3, stop_children/3, and stop_child/3 functions.

The reply_to defines the pid()s that will receive the response from the hub when a child is started or stopped.

Defines the failure result for a single child start operation.

Defines the success result for a single child start operation.

Defines the failure result for a single child stop operation.

The stop_opts() defines the options that can be passed to the stop_children/3 and stop_child/3 functions.

Defines the success result for a single child stop operation.

t()

This is the base configuration structure for the hub and has to be passed to the start_link/1 function.

Functions

This function can be used to wait for the ProcessHub child start or stop functions to complete.

Returns information about processes that are registered with the given child_id/0.

Returns the child specification for the ProcessHub.Initializer supervisor.

Returns the first pid for the given child_id.

Returns a list of pids for the given child_id.

Checks if the ProcessHub with the given hub_id/0 is alive.

Checks if the ProcessHub with the given hub_id/0 is locked.

Checks if the ProcessHub with the given hub_id/0 is in a network-partitioned state.

Returns a list of nodes where the ProcessHub with the given hub_id/0 is running.

Returns a list of processes that are registered.

Returns all information in the registry.

Promotes the ProcessHub with the given hub_id/0 to a node.

Starts a child process that will be distributed across the cluster. The t:child_spec() :id must be unique.

Starts multiple child processes that will be distributed across the cluster.

Starts the ProcessHub with the given hub_id/0 and settings.

Stops the ProcessHub with the given hub_id/0.

Stops a child process in the cluster.

Stops multiple child processes in the cluster.

Works similarly to Supervisor.which_children/1, but wraps the result in a tuple containing the node name and the children.

Types

@type child_id() :: atom() | binary()

The child_id defines the name of the child. It is used to identify the child. Each child must have a unique child_id() in the cluster. A single child may have multiple pid()s across the cluster.

@type child_spec() :: Supervisor.child_spec()

The child_spec defines the specification of a child process.

@type hub_id() :: atom()

The hub_id defines the name of the hub. It is used to identify the hub.

@type init_opts() :: [
  async_wait: boolean(),
  timeout: non_neg_integer(),
  check_existing: boolean(),
  on_failure: :continue | :rollback
]

The init_opts() defines the options that can be passed to the start_children/3, start_child/3, stop_children/3, and stop_child/3 functions.

  • :async_wait - is optional and is used to define whether the function should return another function that can be used to wait for the children to start or stop. The default is false.
  • :timeout is optional and is used to define the timeout for the function. The timeout option should be used with async_wait: true. The default is 5000 (5 seconds).
  • :check_existing - is optional and is used to define whether the function should check if the children are already started. The default is true.
  • :on_failure - is optional and is used to define the action to take when the start operation fails. The default is :continue which will continue to start the next child. The other option is :rollback which will stop all the children that have been started.
@type reply_to() :: [pid()]

The reply_to defines the pid()s that will receive the response from the hub when a child is started or stopped.

@type start_failure() :: {child_id(), node(), term()}

Defines the failure result for a single child start operation.

The term() can be any term that describes the failure reason.

@type start_result() :: {child_id(), [{node(), pid()}]}

Defines the success result for a single child start operation.

@type stop_failure() :: {child_id(), [term()]}

Defines the failure result for a single child stop operation.

The term() can be any term that describes the failure reason.

@type stop_opts() :: [async_wait: boolean(), timeout: non_neg_integer()]

The stop_opts() defines the options that can be passed to the stop_children/3 and stop_child/3 functions.

  • :async_wait - is optional and is used to define whether the function should return another function that can be used to wait for the children to stop. The default is false.
  • :timeout is optional and is used to define the timeout for the function. The timeout option should be used with async_wait: true. The default is 5000 (5 seconds).
@type stop_result() :: {child_id(), [node()]}

Defines the success result for a single child stop operation.

This is the base configuration structure for the hub and has to be passed to the start_link/1 function.

  • :hub_id is the name of the hub and is required.
  • :child_specs is optional and is used to define the child processes that will be started with the hub statically.
  • :hooks are optional and are used to define the hooks that can be triggered on specific events.
  • :redundancy_strategy is optional and is used to define the strategy for redundancy. The default is ProcessHub.Strategy.Redundancy.Singularity.
  • :migration_strategy is optional and is used to define the strategy for migration. The default is ProcessHub.Strategy.Migration.ColdSwap.
  • :synchronization_strategy is optional and is used to define the strategy for synchronization. The default is ProcessHub.Strategy.Synchronization.PubSub.
  • :partition_tolerance_strategy is optional and is used to define the strategy for partition tolerance. The default is ProcessHub.Strategy.PartitionTolerance.Divergence.
  • :distribution_strategy is optional and is used to define the strategy for process distribution. The default is ProcessHub.Strategy.Distribution.ConsistentHashing.
  • :hubs_discover_interval is optional and is used to define the interval in milliseconds for hubs to start the discovery process. The default is 60000 (1 minute).
  • :deadlock_recovery_timeout is optional and is used to define the timeout in milliseconds to recover from locked hub. Hub locking can happen for different reasons such as updating internal data, migrating processes or handling network partitions. The default is 60000 (1 minute).
  • :storage_purge_interval is optional and is used to define the interval in milliseconds for the janitor to clean up the old cache records when the TTL expires. The default is 15000 (15 seconds).
  • :migr_base_timeout is optional and is used to define the base timeout in milliseconds for the migration process to complete before the hub considers it as a failure. The default is 15000 (15 seconds).
  • :dsup_max_restarts is optional and is used to define the maximum number of restarts for the distributed supervisor. See Supervisor child specification for more information. The default is 100.
  • :dsup_max_seconds is optional and is used to define the maximum number of seconds for the distributed supervisor to restart the child process. See Supervisor child specification for more information. The default is 4.
  • :dsup_shutdown_timeout is optional and is used to define the timeout in milliseconds for the distributed supervisor to wait before forcefully terminating itself when receiving a shutdown signal.

Functions

@spec await(function() | {:error, term()}) ::
  {:ok, [start_result() | stop_result()]}
  | {:error,
     {[start_failure() | stop_failure()], [start_result() | stop_result()]}}
  | {:error,
     {[start_failure() | stop_failure()], [start_result() | stop_result()]},
     :rollback}

This function can be used to wait for the ProcessHub child start or stop functions to complete.

The await/1 function should be used with the async_wait: true option.

Keep in mind that the await/1 function will block the calling process until the response is received. If the response is not received within the timeout period, the function will return {:error, term()}.

Example

iex> ref = ProcessHub.start_child(:my_hub, child_spec, [async_wait: true])
iex> ProcessHub.await(ref)
{:ok, {:my_child, [{:mynode, #PID<0.123.0>}]}}
Link to this function

child_lookup(hub_id, child_id)

View Source
@spec child_lookup(hub_id(), child_id()) :: {child_spec(), [{node(), pid()}]} | nil

Returns information about processes that are registered with the given child_id/0.

This function queries results from the local ets table and does not make any network calls.

The return results contain the child_spec/0 and a list of tuples where the first element is the node where the child is started, and the second element is the pid() of the started child.

Example

iex> {_child_spec, _node_pid_tuples} = ProcessHub.child_lookup(:my_hub, :my_child)
{%{id: :my_child, start: {MyProcess, :start_link, []}}, [{:mynode, #PID<0.123.0>}]}
@spec child_spec(t()) :: %{
  id: ProcessHub,
  start: {ProcessHub.Initializer, :start_link, [...]},
  type: :supervisor
}

Returns the child specification for the ProcessHub.Initializer supervisor.

Link to this function

get_pid(hub_id, child_id)

View Source
@spec get_pid(hub_id(), child_id()) :: pid() | nil

Returns the first pid for the given child_id.

Although the function can be handy to quickly get the pid of the child, it is not recommended to use with replication strategies as it will return the first pid only.

Example

iex> ProcessHub.get_pid(:my_hub, :my_child)
#PID<0.123.0>
Link to this function

get_pids(hub_id, child_id)

View Source
@spec get_pids(hub_id(), child_id()) :: [pid()]

Returns a list of pids for the given child_id.

Example

iex> ProcessHub.get_pids(:my_hub, :my_child)
[#PID<0.123.0>]
@spec is_alive?(hub_id()) :: boolean()

Checks if the ProcessHub with the given hub_id/0 is alive.

A hub is considered alive if the ProcessHub.Initializer supervisor process is running along with the required child processes for the hub to function.

Example

iex> ProcessHub.is_alive?(:not_existing)
false
@spec is_locked?(hub_id()) :: boolean()

Checks if the ProcessHub with the given hub_id/0 is locked.

A hub is considered locked if the ProcessHub local event queue has a priority level greater than or equal to 10. This is used to throttle the hub from processing any new events and conserve data integrity.

Example

iex> ProcessHub.is_locked?(:my_hub)
false
@spec is_partitioned?(hub_id()) :: boolean()

Checks if the ProcessHub with the given hub_id/0 is in a network-partitioned state.

A hub is considered partitioned if the ProcessHub.Strategy.PartitionTolerance strategy has detected a network partition. When a network partition is detected, the hub will terminate the ProcessHub.DistributedSupervisor process along with its children.

Example

iex> ProcessHub.is_partitioned?(:my_hub)
false
Link to this function

nodes(hub_id, opts \\ [])

View Source
@spec nodes(hub_id(), [:include_local] | nil) :: [node()]

Returns a list of nodes where the ProcessHub with the given hub_id/0 is running.

Nodes where the ProcessHub is running with the same hub_id/0 are considered to be part of the same cluster.

Example

iex> ProcessHub.nodes(:my_hub, [:include_local])
[:remote_node]
Link to this function

process_list(hub_id, scope)

View Source
@spec process_list(hub_id(), :global | :local) :: [
  {child_id(), [{node(), pid()}] | pid()}
]

Returns a list of processes that are registered.

The process list contains the child_id/0 and depending on the scope option, it may contain the node and pid() of the child.

This function queries results from the local ets table and does not make any network calls.

Available options:

  • :global - returns a list of all child processes on all nodes in the cluster. The return result will be in the format of [{child_id, [{:node, pid}]}].
  • :local - returns a list of child processes that belong to the local node. The return result will be in the format of [{child_id, [pid]}] but only the processes that belong to the local node will be returned.

Example

iex> ProcessHub.process_list(:my_hub, :global)
[
  {:my_child1, [{:node1, #PID<0.123.0>}, {:node2, #PID<2.123.0>}]},
  {:my_child2, [{:node2, #PID<5.124.0>}]}
]
iex> ProcessHub.process_list(:my_hub, :local)
[{:my_child1, #PID<0.123.0>}]
Link to this function

process_registry(hub_id)

View Source
@spec process_registry(hub_id()) :: ProcessHub.Service.ProcessRegistry.registry()

Returns all information in the registry.

This function queries results from the local ets table and does not make any network calls.

Link to this function

promote_to_node(hub_id, node_name \\ node())

View Source

Promotes the ProcessHub with the given hub_id/0 to a node.

This function should be used when the ProcessHub has been started in a non-node mode and you want to promote it to a node.

The function will update all existing child processes on the registry to match the new node name.

Optionally, you can pass the node_name argument to specify the name of the node. By default, the current node name will be used.

Link to this function

start_child(hub_id, child_spec, opts \\ [])

View Source
@spec start_child(hub_id(), child_spec(), init_opts()) ::
  {:ok, :start_initiated}
  | {:error, :no_children | {:already_started, [atom() | binary(), ...]}}
  | (-> {:ok, start_result()})
  | (-> {:error, start_failure()}
        | {:error, start_failure(), start_result(), :rollback})

Starts a child process that will be distributed across the cluster. The t:child_spec() :id must be unique.

Example

iex> child_spec = %{id: :my_child, start: {MyProcess, :start_link, []}}
iex> ProcessHub.start_child(:my_hub, child_spec)
{:ok, :start_initiated}

By default, the start_child/3 function is asynchronous and returns immediately. To wait for the child to start, you can pass async_wait: true to the opts argument. When async_wait: true, you must await the response from the function.

See init_opts/0 for more options.

Example with synchronous wait

The synchronous response includes the status code :ok or :error, a tuple containing the child_id/0 and a list of tuples where the first key is the node where the child is started, and the second key is the pid() of the started child. By default, the list should contain only one tuple, but if the redundancy strategy is configured for replicas, it may contain more than one tuple.

iex> child_spec = %{id: :my_child, start: {MyProcess, :start_link, []}}
iex> ProcessHub.start_child(:my_hub, child_spec, [async_wait: true]) |> ProcessHub.await()
{:ok, {:my_child, [{:mynode, #PID<0.123.0>}]}}
Link to this function

start_children(hub_id, child_specs, opts \\ [])

View Source
@spec start_children(hub_id(), [child_spec()], init_opts()) ::
  {:ok, :start_initiated}
  | {:error,
     :no_children
     | {:error, :children_not_list}
     | {:already_started, [atom() | binary(), ...]}}
  | (-> {:ok, [start_result()]})
  | (-> {:error, [start_failure()], [start_result()]}
        | {:error, [start_failure()], [start_result()], :rollback})

Starts multiple child processes that will be distributed across the cluster.

Same as start_child/3, except it starts multiple children at once and is more efficient than calling start_child/3 multiple times.

Warning

Using start_children/3 with async_wait: true can lead to timeout errors, especially when the number of children is large. When async_wait: true, you must await the response from the function.

Link to this function

start_link(hub_settings)

View Source
@spec start_link(t()) :: {:ok, pid()} | {:error, term()}

Starts the ProcessHub with the given hub_id/0 and settings.

It is recommended to start the ProcessHub under a supervision tree.

@spec stop(atom()) :: :ok | {:error, :not_alive}

Stops the ProcessHub with the given hub_id/0.

Link to this function

stop_child(hub_id, child_id, opts \\ [])

View Source
@spec stop_child(hub_id(), child_id(), stop_opts()) ::
  {:ok, :stop_initiated}
  | (-> {:ok, stop_result()})
  | (-> {:error, stop_failure()})

Stops a child process in the cluster.

By default, this function is asynchronous and returns immediately. You can wait for the child to stop by passing async_wait: true in the opts argument. When async_wait: true, you must await the response from the function.

Example

iex> ProcessHub.stop_child(:my_hub, :my_child)
{:ok, :stop_initiated}

See stop_opts/0 for more options.

Example with synchronous wait

iex> ProcessHub.stop_child(:my_hub, :my_child, [async_wait: true]) |> ProcessHub.await()
{:ok, {:my_child, [:mynode]}}
Link to this function

stop_children(hub_id, child_ids, opts \\ [])

View Source
@spec stop_children(hub_id(), [child_id()], stop_opts()) ::
  {:ok, :stop_initiated}
  | {:error, list()}
  | (-> {:ok, [stop_result()]})
  | (-> {:error, [stop_failure()], [stop_result()]})

Stops multiple child processes in the cluster.

This function is similar to stop_child/3, but it stops multiple children at once, making it more efficient than calling stop_child/3 multiple times.

Warning

Using stop_children/3 with async_wait: true can lead to timeout errors, especially when stopping a large number of child processes.

Link to this function

which_children(hub_id, opts \\ [])

View Source
@spec which_children(hub_id(), [:global | :local] | nil) ::
  list()
  | {node(),
     [
       {any(), :restarting | :undefined | pid(), :supervisor | :worker,
        :dynamic | list()}
     ]}

Works similarly to Supervisor.which_children/1, but wraps the result in a tuple containing the node name and the children.

Info

The Supervisor.which_children/1 function is known to consume a lot of memory and this can affect performance. The problem is even more relevant when using the :global option as it will make a network call to all nodes in the cluster.

It is highly recommended to use ProcessHub.process_list/2 instead.

Available options:

  • :global - returns a list of all child processes started by all nodes in the cluster. The return result will be in the format of [{:node, children}].
  • :local - returns a list of all child processes started by the local node. The return result will be in the format of {:node, children}.