View Source Creating custom strategy
It's time to extend the functionality of the ProcessHub
and we will do this
by creating a custom strategy distribution strategy.
Implementation
Our custom distribution strategy will be very simple and probably not very useful in practice, but it will serve as a good example of how to create one.
This distribution strategy will distribute processes by comparing the node names. We will also take advance of using hook system to further customize the behavior of the system.
Let's start by creating a new file called compare.ex
. We can place the file anywhere
in the project as long as it is picked up by the compiler.
It is a good practice to add some custom name spacing to the module to avoid conflicts
with other modules. We will use CustomStrategy
as the namespace.
defmodule CustomStrategy.Compare do
end
Our custom distribution strategy should define a struct which can contain any
number of custom options. In our example, we will define a single option called
direction
which will determine if the selected node is either the first or the last
node in the list of nodes.
We will also create our implementation the ProcessHub.Strategy.Distribution.Base
protocol.
defmodule CustomStrategy.Compare do
alias ProcessHub.Strategy.Distribution.Base, as: DistributionStrategy
# We can define custom options.
defstruct [
direction: :asc
]
defimpl DistributionStrategy, for: CustomStrategy.Compare do
end
end
The ProcessHub.Strategy.Distribution.Base
protocol defines some functions that we need to implement. The first function is the init/2
function which is called very early when
the coordinator is started. This can be very good place to register any custom
hook handlers that we need to use. We will register a handler that will print the name of
the node that is joining the hub.
...
defimpl DistributionStrategy, for: CustomStrategy.Compare do
alias ProcessHub.Service.HookManager
alias ProcessHub.Constant.Hook
@impl true
def init(_struct, hub_id) do
handler = %HookManager{
id: :some_id_here,
m: CustomStrategy.Compare,
f: :handle_node_join,
a: [hub_id, :_]
}
HookManager.register_handler(hub_id, Hook.pre_cluster_join(), handler)
end
end
...
Let's also define the handler function!
defmodule CustomStrategy.Compare do
...
def handle_node_join(hub_id, node) do
IO.puts("Node #{node} is joining the hub #{hub_id}")
end
end
...
At this point we only have 2 more functions to implement. Let's continue by
overriding the children_init/4
function which does nothing but return :ok
.
If we return anything else other than :ok
, the startup will fail for this
particular function.
...
@impl true
def children_init(_struct, _hub_id, _child_specs, _opts), do: :ok
...
We now have the last function to implement which is the belongs_to/4
function. This function will determine the node that the process should be distributed to. In our case, we will select a node based on it's name.
The very same function will also be used when processes are redistributed.
We also return a list of nodes although we only select one, the reason for this is that the function can return multiple nodes in case the process should be replicated. For the sake of simplicity, we will ignore the replication factor in this example.
...
@impl true
def belongs_to(struct, hub_id, _child_id, _replication_factor) do
hub_nodes = ProcessHub.Service.Cluster.nodes(hub_id, [:include_local])
selected_node = case struct.direction do
:asc -> Enum.sort(hub_nodes) |> Enum.at(0)
:desc -> Enum.sort(hub_nodes) |> Enum.reverse() |> Enum.at(0)
end
[selected_node] # Ignore the replication factor and just return one node
end
...
This is it!
Final implementation
Our final implementation of the CustomStrategy.Compare
module should look like this:
defmodule CustomStrategy.Compare do
alias ProcessHub.Strategy.Distribution.Base, as: DistributionStrategy
# We can define custom options.
defstruct [
direction: :asc
]
def handle_node_join(hub_id, node) do
IO.puts("Node #{node} is joining the hub #{hub_id}")
end
defimpl DistributionStrategy, for: CustomStrategy.Compare do
alias ProcessHub.Service.HookManager
alias ProcessHub.Constant.Hook
@impl true
def init(_struct, hub_id) do
handler = %HookManager{
id: :some_id_here,
m: CustomStrategy.Compare,
f: :handle_node_join,
a: [hub_id, :_]
}
HookManager.register_handler(hub_id, Hook.pre_cluster_join(), handler)
end
@impl true
def children_init(_struct, _hub_id, _child_specs, _opts), do: :ok
@impl true
def belongs_to(struct, hub_id, _child_id, _replication_factor) do
hub_nodes = ProcessHub.Service.Cluster.nodes(hub_id, [:include_local])
selected_node = case struct.direction do
:asc -> Enum.sort(hub_nodes) |> Enum.at(0)
:desc -> Enum.sort(hub_nodes) |> Enum.reverse() |> Enum.at(0)
end
[selected_node] # Ignore the replication factor and just return one node
end
end
end
Switching to the custom strategy
Replace the default distribution strategy with our custom strategy in the ProcessHub
configuration.
defmodule MyAp.Application do
use Application
@impl true
def start(_type, _args) do
children = [process_hub()]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
defp process_hub() do
{ProcessHub, %ProcessHub{
hub_id: :my_hub,
distribution_strategy: %CustomStrategy.Compare{
direction: :desc
}
}}
end
end
Test drive
Let's start two nodes on separate terminals.
iex --name node1@127.0.0.1 --cookie mycookie -S mix
iex --name node2@127.0.0.1 --cookie mycookie -S mix
Connect the nodes by running the following command on the node1
terminal.
iex> Node.connect(:"node2@127.0.0.1")
true
Time to see the magic happen!
iex> ProcessHub.start_children(:my_hub, [
...> %{id: "process1", start: {MyProcess, :start_link, []}},
...> %{id: "process2", start: {MyProcess, :start_link, []}},
...> %{id: "process3", start: {MyProcess, :start_link, []}},
...> %{id: "process4", start: {MyProcess, :start_link, []}},
...> %{id: "process5", start: {MyProcess, :start_link, []}}
...> ])
{:ok, :start_initiated}
iex> ProcessHub.process_list(:my_hub, :global)
[
process1: ["node2@127.0.0.1": #PID<23066.269.0>],
process2: ["node2@127.0.0.1": #PID<23066.270.0>],
process3: ["node2@127.0.0.1": #PID<23066.271.0>],
process4: ["node2@127.0.0.1": #PID<23066.272.0>],
process5: ["node2@127.0.0.1": #PID<23066.273.0>]
]
We can confirm that all processes are started on the second node if we
pass the :desc
option to the struct.