View Source ProcessHub.Service.RequestManager (ProcessHub v0.5.0-beta)

Unified module for distributed operation tracking and execution utilities.

This module provides:

  • Operation struct for tracking distributed operations across nodes
  • State management (store, get, update, remove from coordinator state)
  • GenServer handlers for coordinator callbacks
  • Execution utilities (partition check, send response, request factories)
  • Tracking utilities (expired?, all_responded?, record_response)
  • Request splitting for batching large requests

Summary

Functions

Composes sub-requests for each target node.

Factory: Creates a contraction request for topology contraction.

Finalizes an operation by aggregating results and running post-processing.

Loads all strategies from storage.

Factory: Creates a migration request for topology expansion.

Creates a new Operation struct.

Groups children by target node into a keyword list [{node, [child_data, ...]}, ...].

Records a node's response and checks if operation is complete.

Converts request struct to keyword options for response routing.

Sends response to coordinator.

Splits large requests into smaller batches for efficient cross-node transmission.

Wraps function execution with partition check.

Types

@type t() :: %ProcessHub.Service.RequestManager{
  awaiter: {pid(), reference()} | nil,
  completed_nodes: MapSet.t(),
  expires_at: integer(),
  future: ProcessHub.Future.t() | nil,
  handler: module(),
  hub_id: ProcessHub.hub_id(),
  nodes_data: [{node(), [map()]}],
  options: keyword(),
  sub_requests: [struct()],
  transaction_id: reference()
}

Functions

@spec all_responded?(map()) :: boolean()
@spec cleanup_expired(ProcessHub.Hub.t()) :: ProcessHub.Hub.t()
Link to this function

compose_sub_requests(operation)

View Source
@spec compose_sub_requests(t()) :: {:ok, t()} | {:error, :no_children}

Composes sub-requests for each target node.

Link to this function

contraction_request(hub, children_data, opts \\ [migration_add: true])

View Source

Factory: Creates a contraction request for topology contraction.

@spec expired?(map()) :: boolean()
Link to this function

finalize(operation, hub)

View Source
@spec finalize(t(), ProcessHub.Hub.t()) :: struct()

Finalizes an operation by aggregating results and running post-processing.

Link to this function

get(state, transaction_id)

View Source
@spec get(ProcessHub.Hub.t(), reference()) :: t() | nil
Link to this function

handle_await(state, transaction_id, from)

View Source
@spec handle_await(ProcessHub.Hub.t(), reference(), {pid(), reference()}) ::
  {:reply, term(), ProcessHub.Hub.t()} | {:noreply, ProcessHub.Hub.t()}
Link to this function

handle_response(state, transaction_id, response_node, results)

View Source
@spec handle_response(ProcessHub.Hub.t(), reference(), node(), term()) ::
  {:noreply, ProcessHub.Hub.t()}
Link to this function

handle_timeout(state, transaction_id, from)

View Source
@spec handle_timeout(ProcessHub.Hub.t(), reference(), {pid(), reference()}) ::
  {:noreply, ProcessHub.Hub.t()}
@spec load_strategies(ProcessHub.Hub.t()) :: %{
  sync: term(),
  dist: term(),
  redun: term(),
  migr: term()
}

Loads all strategies from storage.

Link to this function

migration_request(hub, target_node, children_data, opts \\ [])

View Source

Factory: Creates a migration request for topology expansion.

Link to this function

new(hub, handler_module, nodes_data, opts)

View Source
@spec new(ProcessHub.Hub.t(), module(), [{node(), [map()]}], keyword()) :: t()

Creates a new Operation struct.

Link to this function

populate_forward(forward_data, target_nodes, child_data)

View Source
@spec populate_forward(keyword(), [node()], map()) :: keyword()

Groups children by target node into a keyword list [{node, [child_data, ...]}, ...].

Link to this function

process_response(operation, response_node, results)

View Source
@spec process_response(t(), node(), term()) :: {:complete, t()} | {:pending, t()}

Records a node's response and checks if operation is complete.

Link to this function

record_response(request, response_node, results, results_field)

View Source
@spec record_response(map(), node(), term(), atom()) :: map()
Link to this function

remove(state, transaction_id)

View Source
@spec remove(ProcessHub.Hub.t(), reference()) :: ProcessHub.Hub.t()
Link to this function

request_to_opts(request)

View Source
@spec request_to_opts(map()) :: keyword()

Converts request struct to keyword options for response routing.

Link to this function

send_response(response_type, opts, results)

View Source
@spec send_response(atom(), keyword(), [{ProcessHub.child_id(), term()}]) ::
  :ok | :skip

Sends response to coordinator.

Link to this function

set_awaiter(request, from)

View Source
@spec set_awaiter(
  map(),
  {pid(), reference()}
) :: map()
@spec split(struct()) :: [struct()]

Splits large requests into smaller batches for efficient cross-node transmission.

@spec store(ProcessHub.Hub.t(), t()) :: ProcessHub.Hub.t()
Link to this function

update(state, operation)

View Source
@spec update(ProcessHub.Hub.t(), t()) :: ProcessHub.Hub.t()
Link to this function

with_partition_check(hub, fun)

View Source
@spec with_partition_check(ProcessHub.Hub.t(), (-> term())) ::
  term() | {:error, :partitioned}

Wraps function execution with partition check.