View Source Hooks

Hooks are used to trigger events on specific actions. Hooks can be registered by passing the handlers to the :hooks option of the ProcessHub.t/0 configuration struct or by inserting them dynamically using the ProcessHub.Service.HookManager module.

ProcessHub heavily uses hooks internally in the integration tests and strategy implementations.

Hooks have to be in the format of an mfa tuple. Basically, they are functions that will be called when the hook is triggered.

It is possible to register a hook handler with a wildcard argument :_, which will be replaced with the hook data when the hook is dispatched.

Hook registration examples

Static hook registration

Static hook registration occurs when all hook handlers are known at the time of hub initialization using the ProcessHub.t/0 configuration struct.

Example:

# Register a hook handler for the `:pre_cluster_join_hook` event with a wildcard argument.
defmodule MyApp.Application do
  alias ProcessHub.Service.HookManager
  alias alias ProcessHub.Constant.Hook

  use Application

  def start(_type, _args) do
    children = [
      ProcessHub.child_spec(%ProcessHub{
        hub_id: :my_hub,
        hooks: %{
          Hook.pre_cluster_join() => [
            %HookManager{
              id: :hook_id_1,
              m: MyModule,
              f: :my_function,
              a: [:some_data, :_]
            },
            %HookManager{
              id: :hook_id_2,
              m: MyModule2,
              f: :my_function2,
              a: [:some_data]
            }
          ],
          Hook.post_cluster_join() => [
            %HookManager{
              id: :hook_id_3,
              m: MyModule,
              f: :my_function,
              a: [:some_data, :_]
            }
          ]
        }
      })
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Dynamic hook registration

Dynamic hook registration is useful when modifying hook handlers at runtime. It allows us to add new handlers or remove existing ones.

Example:

alias ProcessHub.Service.HookManager
alias ProcessHub.Constant.Hook

# Register a hook handler for the `:cluster_join` event with a wildcard argument.
HookManager.register_handler(
  :my_hub, 
  Hook.post_cluster_join(),
  %HookManager{
    id: :hook_id_30, 
    m: MyModule,
    f: :my_function,
    a: [:some_data, :_]
  }
)

Hook handler examples

# The hook handler should be in the following format:
defmodule MyModule do
  def my_function(:some_data, dynamic_hook_data) do
    # Do something with the data.
  end
end

Using hooks to react and handle asynchronous events

Sometimes when starting a child process, it is necessary to know when the child process has been started and is ready to receive messages. We sure could use the :async_wait option but that would block the process that is starting the child process. Instead, we can use hooks to react to the :pre_children_start event asynchronously.

Example:

# Register hook handler
alias ProcessHub.Service.HookManager
alias ProcessHub.Constant.Hook
...
hub_id = :my_hub

hook_handler1 = %HookManager{
  id: :handler_id_1,
  m: MyModule,
  f: :my_first_func,
  a: [:_]
}

hook_handler2 = %HookManager{
  id: :handler_id_2,
  m: MyModule,
  f: :my_second_func,
  a: [:hub_id, :_]
}

# We're triggering the hook when child process has been registered under the process registry.
HookManager.register_handlers(:my_hub, Hook.registry_pid_inserted(), [hook_handler1, hook_handler2])
...

# The hook handlers
defmodule MyModule do
  def my_first_func(hub_id, {child_id, node_pids}) do
    # Send message to all started child processes.
    Enum.each(node_pids, fn {_node, pid} ->
      Process.send(pid, :hello_world, [])
    end)
  end

  def my_second_func(hub_id, hook_data) do
    ...
  end
end

Internally registered hook handlers

ProcessHub is using hooks internally to depending on the active configuration. These hook handlers should never be removed or modified. The list of internally registered hooks ids are:

  • :ch_join
  • :ch_leave
  • :ch_shutdown
  • :mhs_shutdown
  • :mhs_process_startups
  • :rr_post_start
  • :rr_post_stop
  • :rr_post_update
  • :dg_pre_start_handler

Hooks in custom strategy implementations

Hooks can be used in custom strategy implementations to trigger events on specific actions and extend the functionality of the strategy.

Here's an example of ProcessHub using hooks internally in the ProcessHub.Strategy.Distribution.ConsistentHashing strategy implementation:

alias ProcessHub.Constant.StorageKey
alias ProcessHub.Service.Storage
alias ProcessHub.Service.HookManager

# Register hook handlers to update the hash ring when node joins or leaves the cluster.
def init(_strategy, hub_id) do
  local_storage = ProcessHub.Utility.Name.local_storage(hub_id)

  hub_nodes = Storage.get(local_storage, :hub_nodes)

  Storage.insert(local_storage, StorageKey.hr, Ring.create_ring(hub_nodes))

  join_handler = %HookManager{
    id: :ch_join,
    m: ProcessHub.Strategy.Distribution.ConsistentHashing,
    f: :handle_node_join,
    a: [hub_id, :_]
  }

  leave_handler = %HookManager{
    id: :ch_join,
    m: ProcessHub.Strategy.Distribution.ConsistentHashing,
    f: :handle_node_leave,
    a: [hub_id, :_]
  }

  shutdown_handler = %HookManager{
    id: :ch_shutdown,
    m: ProcessHub.Strategy.Distribution.ConsistentHashing,
    f: :handle_shutdown,
    a: [hub_id],
    p: 100
  }

  HookManager.register_handler(hub_id, Hook.pre_cluster_join(), join_handler)
  HookManager.register_handler(hub_id, Hook.pre_cluster_leave(), leave_handler)
  HookManager.register_handler(hub_id, Hook.coordinator_shutdown(), shutdown_handler)
end


# Handler functions that will be called when the hooks are triggered.
def handle_node_join(hub_id, node) do
  hash_ring = Ring.get_ring(hub_id) |> Ring.add_node(node)

  Storage.insert(ProcessHub.Utility.Name.local_storage(hub_id), StorageKey.hr, hash_ring)
end

def handle_node_leave(hub_id, node) do
  hash_ring = Ring.get_ring(hub_id) |> Ring.remove_node(node)

  Storage.insert(ProcessHub.Utility.Name.local_storage(hub_id), StorageKey.hr, hash_ring)
end

Hook handler priority

ProcessHub by default does not guarantee the order of hook handler execution unless the hook handler specifies a priority level. The priority level is an integer value that determines the order of execution of the hook handlers. The higher the priority level, the earlier the hook handler will be executed. By default all hooks have a priority level of 0.

We define the priority level of a hook handler by setting the :p field of the ProcessHub.Service.HookManager.t/0

Here an example of how to set the priority level of a hook handler to 15:

alias ProcessHub.Service.HookManager
alias ProcessHub.Constant.Hook
...
hook_handler = %HookManager{
  id: :handler_id_1,
  m: MyModule,
  f: :handle_child_start,
  a: [],
  p: 15 # here we set the priority level to 15.
}

HookManager.register_handler(:my_hub, Hook.pre_cluster_leave(), hook_handler)

Hook handler arguments

The hook handler expects a arguments which is a list of values. These values will then be passed to the hook handler function in the same order. Some hooks dispatchers will pass additional data to the hook handler function which then can be accessed using the wildcard argument in the handler function. These are called wildcard arguments and are represented by the :_ symbol.

Example:

alias ProcessHub.Service.HookManager
alias ProcessHub.Constant.Hook
...
hook_handler = %HookManager{
  id: :handler_id_1,
  m: MyModule,
  f: :handle_child_start,
  a: [:_, :some, :something_else] # here the first argument will be the wildcard argument.
}

HookManager.register_handler(:my_hub, Hook.pre_cluster_join(), hook_handler)

# Here the handler function receives the wildcard argument as the first argument.
defmodule MyModule do
  def handle_child_start(wild_card_value, :some, :something_else) do
    # Because we used the `:pre_cluster_join_hook` hook event the wild_card_value will be a `node()`
  end
end

You can familiarize yourself with the available hooks and their data in the last section of this guide.

Listing hook handlers

You can list all registered hook handlers for a specific hook event by calling the registered_handlers/2 function from the ProcessHub.Service.HookManager module.

Example:

iex> alias ProcessHub.Service.HookManager
iex> alias ProcessHub.Constant.Hook 
iex> HookManager.registered_handlers :my_hub, Hook.registry_pid_inserted()  
[
  %ProcessHub.Service.HookManager{
    id: :handler_id_2,
    m: MyModule,
    f: :my_second_func,
    a: [:hub_id, :_],
    p: 0
  }
]

Removing hook handlers

You can remove a specific hook handler by calling the cancel_handler/3 function.

Example:

iex> alias ProcessHub.Service.HookManager
iex> alias ProcessHub.Constant.Hook 
iex> HookManager.cancel_handler(:my_hub, Hook.registry_pid_inserted(), :handler_id_1)
:ok

It's also possible to dispatch hooks manually by calling the dispatch_hook/3 or dispatch_hooks/3 functions from the ProcessHub.Service.HookManager module. Although, it's not recommended to dispatch hooks manually as it can lead to unexpected behavior.

Available hooks

Event KeyTriggerData
:pre_cluster_join_hookNode joins hub cluster, not handlednode()
:post_cluster_join_hookNode joins hub cluster, handlednode()
:pre_cluster_leave_hookNode leaves hub cluster, not handlednode()
:post_cluster_leave_hookNode leaves hub cluster, handlednode()
:registry_pid_inserted_hookProcess registered{child_id(), [{node(), pid()}]}
:registry_pid_removed_hookProcess unregisteredchild_id()
:children_migrated_hookProcesses migrated{node(), [child_spec()]}
:priority_state_updated_hookPriority state updated{priority_level(), list()}
:pre_nodes_redistribution_hookNodes redistribution start{:nodeup or :nodedown, node()}
:post_nodes_redistribution_hookNodes redistribution end{:nodeup or :nodedown, node()}
:forwarded_migration_hookMigration was forwarded and handled[{node(), [children_start_data()]}]
:pre_children_startBefore child process is startedProcessHub.Handler.ChildrenAdd.StartHandle.t()
:post_children_startAfter child process is started[{child_id(), result(), pid(), [node()]}]
:pre_children_redistribution_hookBefore redistribution of children is called[{child_id(), node(), {:pid, pid()}}]
:coordinator_shutdown_hookCoordinator shutdownany()
:process_startups_hookProcesses have been startups[%{cid: child_id, pid: pid()}]
:child_process_pid_updateChild process restarted with new pid{node(), child_id()}

See ProcessHub.Constant.Hook module for more information on available hooks.