View Source ProcessHub.Service.RequestManager (ProcessHub v0.5.0-beta)
Unified module for distributed operation tracking and execution utilities.
This module provides:
- Operation struct for tracking distributed operations across nodes
- State management (store, get, update, remove from coordinator state)
- GenServer handlers for coordinator callbacks
- Execution utilities (partition check, send response, request factories)
- Tracking utilities (expired?, all_responded?, record_response)
- Request splitting for batching large requests
Summary
Functions
Composes sub-requests for each target node.
Factory: Creates a contraction request for topology contraction.
Finalizes an operation by aggregating results and running post-processing.
Loads all strategies from storage.
Factory: Creates a migration request for topology expansion.
Creates a new Operation struct.
Groups children by target node into a keyword list [{node, [child_data, ...]}, ...].
Records a node's response and checks if operation is complete.
Converts request struct to keyword options for response routing.
Sends response to coordinator.
Splits large requests into smaller batches for efficient cross-node transmission.
Wraps function execution with partition check.
Types
@type t() :: %ProcessHub.Service.RequestManager{ awaiter: {pid(), reference()} | nil, completed_nodes: MapSet.t(), expires_at: integer(), future: ProcessHub.Future.t() | nil, handler: module(), hub_id: ProcessHub.hub_id(), nodes_data: [{node(), [map()]}], options: keyword(), sub_requests: [struct()], transaction_id: reference() }
Functions
@spec cleanup_expired(ProcessHub.Hub.t()) :: ProcessHub.Hub.t()
Composes sub-requests for each target node.
contraction_request(hub, children_data, opts \\ [migration_add: true])
View Source@spec contraction_request( ProcessHub.Hub.t(), [{ProcessHub.child_spec(), map()}], keyword() ) :: ProcessHub.Request.Handler.StartChildrenRequest.t()
Factory: Creates a contraction request for topology contraction.
@spec finalize(t(), ProcessHub.Hub.t()) :: struct()
Finalizes an operation by aggregating results and running post-processing.
@spec get(ProcessHub.Hub.t(), reference()) :: t() | nil
@spec handle_await(ProcessHub.Hub.t(), reference(), {pid(), reference()}) :: {:reply, term(), ProcessHub.Hub.t()} | {:noreply, ProcessHub.Hub.t()}
@spec handle_response(ProcessHub.Hub.t(), reference(), node(), term()) :: {:noreply, ProcessHub.Hub.t()}
@spec handle_timeout(ProcessHub.Hub.t(), reference(), {pid(), reference()}) :: {:noreply, ProcessHub.Hub.t()}
@spec load_strategies(ProcessHub.Hub.t()) :: %{ sync: term(), dist: term(), redun: term(), migr: term() }
Loads all strategies from storage.
@spec migration_request( ProcessHub.Hub.t(), node(), [{ProcessHub.child_spec(), map()}], keyword() ) :: ProcessHub.Request.Handler.StartChildrenRequest.t()
Factory: Creates a migration request for topology expansion.
Creates a new Operation struct.
Groups children by target node into a keyword list [{node, [child_data, ...]}, ...].
Records a node's response and checks if operation is complete.
@spec remove(ProcessHub.Hub.t(), reference()) :: ProcessHub.Hub.t()
Converts request struct to keyword options for response routing.
@spec send_response(atom(), keyword(), [{ProcessHub.child_id(), term()}]) :: :ok | :skip
Sends response to coordinator.
Splits large requests into smaller batches for efficient cross-node transmission.
@spec store(ProcessHub.Hub.t(), t()) :: ProcessHub.Hub.t()
@spec update(ProcessHub.Hub.t(), t()) :: ProcessHub.Hub.t()
@spec with_partition_check(ProcessHub.Hub.t(), (-> term())) :: term() | {:error, :partitioned}
Wraps function execution with partition check.