Zero Cost Distribution

Cloister provides a callback on cluster topology changes, which makes it easy to perform some additional setup when the cluster is up.

But what if we want to distribute our processing without actually writing the support for running in distributed mode? For that one might use Tarearbol.DynamicManager abstraction.

Let’s say we receive gazillions of messages from some message broker and need to process them asynchronously. The typical application would use Cloister as the cluster manager and Tarearbol.DynamicManager as the workload processor. Our goal would be to build absolutely transparent system, allowing horizontal scaling out of the box. We cannot just spawn another node in a hope that everything would be fine because the processor is determined by a HashRing backed up by Cloister and the message would be fed from the message broker by the random—currently available—node. Of course we might ask Cloister about who is to process this message, and send the message directly to this process, but this would potentially suffer from back pressure issues either blocking the broker queue, or overflowing the target process mailbox.

The easiest solution would be to store incoming messages in local store, that would be consumed by our workers. The dataflow would look somewhat like

Cloister + Tarearbol Data Flow


That would be a no-brainer at all, if we were working in legacy running instances. But here we have a dockerized environment. Containers might appear, and disappear almost at random. And we want to handle each topology change gracefully.

Topology Change Listener

First of all, we need to supply our own inmplementation of topology change listener, as described in Configuration.

config :cloister,
  otp_app: :my_app,
  listener: MyApp.CloisterListener,
  ...

And the boilerplate for the implementation would be that

defmodule MyApp.CloisterListener do
  @moduledoc false

  @behaviour Cloister.Listener
  require Logger

  @impl Cloister.Listener
  def on_state_change(_from, %Cloister.Monitor{status: :up}) do
    Logger.info("Cluster is up")
  end

  def on_state_change(from, %Cloister.Monitor{status: status}) do
    Logger.debug("Cluster state change #{from}#{status}")
  end

So far, so good. Now we want to support distributed mnesia here. Let’s suppose we might survive several seconds blackout on topology changes, and we use this mnesia as a cache, so it does not contain billions of records. We would simply load the existing cache content into memory, recreate mnesia for new topology and feed it with the data from memory. There are many more elegant and robust solutions, but for our demonstration purposes that scenario is perfectly fine.

Let’s change the former on_state_change/2 clause to do that.

@impl Cloister.Listener
def on_state_change(_from, %Cloister.Monitor{status: :up}) do
  this = node()
  others = Node.list()
  nodes = [this | others]

  # voluntarily select a master node
  if match?([^this | _], Enum.sort(nodes)) do
    load_data_into_memory() # the implementation is out of scope

    Enum.each(@tables, :mnesia.delete_table/1)

    :mnesia.stop()
    :mnesia.delete_schema(nodes)
    :mnesia.create_schema(nodes)
    :mnesia.start()

    Enum.each(@tables, &:mnesia.create_table(&1, disk: nodes))
    :mnesia.wait_for_tables(@tables)
    
    load_data_from_memory() # the implementation is out of scope

    # deploy this to other nodes
    unless nodes == [this] do
      :mnesia.change_config(:extra_db_nodes, others)
      Enum.each(others, &:mnesia.change_table_copy_type(:schema, &1, :disc_copies))

      for table <- :mnesia.system_info(:tables),
          table in @tables,
          {^this, type} <- :mnesia.table_info(table, :where_to_commit),
          do: Enum.each(others, &:mnesia.add_table_copy(table, &1, type))
    end
  end
end

We are done! Now the topology change would enforce the renewal of mnesia configuration. It surely might be done in more sophisticated way by e. g. deleting schemas on remote nodes and restarting mnesia there, but this is also good enough.

OK, it’s now time to serve processors.

DynamicManager Implementation

As described in the documentation, Tarearbol.DynamicManager expects a worker definition. Let’s provide it.

defmodule MyApp.WorkProcessor do
  @moduledoc false

  use Tarearbol.DynamicManager

  @impl Tarearbol.DynamicManager
  def children_specs do
    # start million processes, handling entities named like "foo_42"
    for i <- 1..1_000_000, do: {"foo_#{i}", []}, into: %{}
  end

  @impl Tarearbol.DynamicManager
  def perform(foo, payload) do
    if Cloister.mine?(foo),
      do: {:replace, foo, perform_work()},
      else: {:ok, DateTime.utc_now()}
  end

  @impl Tarearbol.DynamicManager
  def handle_state_change(state), do: state

  @impl Tarearbol.DynamicManager
  def handle_timeout(_state), do: :ok

  @spec perform_work :: [any()]
  defp perform_work do
    for record <- :mnesia.select(...), do: ...
  end
end

We are all set now. Once came from the broker, the message gets stored in mnesia. Then the worker for this key gets the message(es) from there and does whatever it needs.

When the new node is being added/removed, Cloister would change the state into :rehashing, rebuild the mnesia database and continue processing messages with a new cluster topology.