View Source ProcessHub.Strategy.Migration.SwapMigration (ProcessHub v0.5.0-beta)

Shared migration logic used by both ColdSwap and HotSwap strategies.

This module extracts the common patterns for topology expansion and contraction so that both strategies can reuse the same core logic while differing only in their termination timing and post-action behavior.

Summary

Functions

Generic state collection with configurable response message atom.

Computes processable children for migration during topology expansion.

Creates StartChildrenRequest.for_migration for each node group.

Dispatches Hook.migration_completed() if migrated list is non-empty.

Groups [{cspec, meta, target_nodes}] into %{node => [{cspec, meta}]}.

Full contraction logic (identical for both strategies).

Handles delivery of pre-sent shutdown states when processes start on the target node.

Handles graceful shutdown by querying states from all local processes and sending them to target nodes before this node goes down.

Stores received shutdown state data on the target node.

Sends start requests via Dispatcher if non-empty.

Functions

Link to this function

collect_states(remaining_cids, timeout, acc, response_msg_atom)

View Source
@spec collect_states([ProcessHub.child_id()], non_neg_integer(), list(), atom()) ::
  list()

Generic state collection with configurable response message atom.

ColdSwap uses :coldswap_state, HotSwap uses :hotswap_state.

Link to this function

compute_processable(hub, handler)

View Source
@spec compute_processable(ProcessHub.Hub.t(), map()) :: {map(), map(), map()}

Computes processable children for migration during topology expansion.

Gets migration candidates, calculates distribution, populates handler.calculated_cids, and categorizes children into %{stop_local: [...], forward_to: [...]}.

Returns {handler, processable, migration_candidates}.

Link to this function

create_migration_requests(hub, grouped_by_node, post_action)

View Source
@spec create_migration_requests(
  ProcessHub.Hub.t(),
  %{required(node()) => [{ProcessHub.child_spec(), map()}]},
  ProcessHub.Request.PostAction.t() | nil
) :: [ProcessHub.Request.Handler.StartChildrenRequest.t()]

Creates StartChildrenRequest.for_migration for each node group.

post_action is nil or a PostAction struct.

Link to this function

dispatch_migration_hook(hub, migrated_cspecs, nodes)

View Source
@spec dispatch_migration_hook(ProcessHub.Hub.t(), [ProcessHub.child_spec()], [node()]) ::
  :ok

Dispatches Hook.migration_completed() if migrated list is non-empty.

Link to this function

group_children_by_node(forward_to_list)

View Source
@spec group_children_by_node([{ProcessHub.child_spec(), map(), [node()]}]) :: %{
  required(node()) => [{ProcessHub.child_spec(), map()}]
}

Groups [{cspec, meta, target_nodes}] into %{node => [{cspec, meta}]}.

Link to this function

handle_contraction(hub, handler)

View Source
@spec handle_contraction(ProcessHub.Hub.t(), map()) :: map()

Full contraction logic (identical for both strategies).

Uses pre-calculated handler.calculated_cids to start children locally that should now be PRIMARY on the local node.

Link to this function

handle_process_startups(hub, cpids, storage_key, delivery_msg)

View Source
@spec handle_process_startups(ProcessHub.Hub.t(), list(), atom(), atom()) :: nil

Handles delivery of pre-sent shutdown states when processes start on the target node.

Parameters:

  • hub - the hub struct
  • cpids - list of %{cid: child_id, pid: pid} structs from process_startups hook
  • storage_key - ETS key where shutdown states are stored
  • delivery_msg - atom for delivery message (e.g. :coldswap_handover)
Link to this function

handle_shutdown(hub, timeout, query_msg, response_msg, callback_mod, log_prefix)

View Source
@spec handle_shutdown(
  ProcessHub.Hub.t(),
  non_neg_integer(),
  atom(),
  atom(),
  module(),
  String.t()
) :: :ok

Handles graceful shutdown by querying states from all local processes and sending them to target nodes before this node goes down.

Parameters:

  • hub - the hub struct
  • timeout - state query timeout in ms
  • query_msg - atom to send to processes (e.g. :query_cold_handover_state)
  • response_msg - atom expected in response (e.g. :coldswap_state)
  • callback_mod - module containing handle_storage_update/2 for remote cast
  • log_prefix - string for timeout log messages (e.g. "ColdSwap")
Link to this function

handle_storage_update(hub, data, storage_key)

View Source
@spec handle_storage_update(ProcessHub.Hub.t(), list(), atom()) :: :ok

Stores received shutdown state data on the target node.

Called via GenServer.cast from the shutting-down node.

Link to this function

notify_originating_node(hub, results, originating_node, child_ids, callback_mod, callback_fun)

View Source

Shared post-action logic on target node.

Filters successfully started children, sends callback to originating node.

Link to this function

send_start_requests(hub, requests)

View Source
@spec send_start_requests(ProcessHub.Hub.t(), [
  ProcessHub.Request.Handler.StartChildrenRequest.t()
]) ::
  :ok

Sends start requests via Dispatcher if non-empty.