Configuration for the distributed execution.
Summary
Functions
Skip validation if the value is None when initialisation is delayed.
Python method ParallelConfig._validate_parallel_config.
Python method ParallelConfig._verify_args.
Provide a hash that uniquely identifies all the configs
We might need to initialize process groups in multiple
Python method ParallelConfig.has_unfinished_dp.
Constructs ParallelConfig.
Python method ParallelConfig.stateless_init_dp_group.
Python method ParallelConfig.sync_kv_cache_memory_size.
Types
Functions
@spec _skip_none_validation(SnakeBridge.Ref.t(), term(), term(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Skip validation if the value is None when initialisation is delayed.
Parameters
value(term())handler(term())
Returns
term()
@spec _validate_parallel_config( SnakeBridge.Ref.t(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python method ParallelConfig._validate_parallel_config.
Returns
term()
@spec _verify_args( SnakeBridge.Ref.t(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python method ParallelConfig._verify_args.
Returns
term()
@spec all2all_backend(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec compute_hash( SnakeBridge.Ref.t(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Provide a hash that uniquely identifies all the configs
that affect the structure of the computation graph from input ids/embeddings to the final hidden states, excluding anything before input ids/embeddings and after the final hidden states.
This hash is also used for DP worker configuration validation to prevent hangs from mismatched collective communication patterns.
Returns
term()
@spec cp_kv_cache_interleave_size(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec data_parallel_backend(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec data_parallel_external_lb(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec data_parallel_hybrid_lb(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec data_parallel_index(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec data_parallel_master_ip(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec data_parallel_master_port(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec data_parallel_rank(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec data_parallel_rank_local(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec data_parallel_rpc_port(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec data_parallel_size(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec data_parallel_size_local(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec dbo_decode_token_threshold(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec dbo_prefill_token_threshold(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec dcp_kv_cache_interleave_size(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec decode_context_parallel_size(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec disable_custom_all_reduce(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec disable_nccl_for_dp_synchronization(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec distributed_executor_backend(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec enable_dbo(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec enable_eplb(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec enable_expert_parallel(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec eplb_config(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec expert_placement_strategy(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec get_next_dp_init_port( SnakeBridge.Ref.t(), keyword() ) :: {:ok, integer()} | {:error, Snakepit.Error.t()}
We might need to initialize process groups in multiple
processes that is related to data parallelism, e.g. both in the worker and in the engine, which can live in different processes. To avoid port conflicts, we pop a new port from the prepared port list each time we need to initialize a new process group related to data parallelism.
Returns
integer()
@spec has_unfinished_dp(SnakeBridge.Ref.t(), term(), boolean(), keyword()) :: {:ok, boolean()} | {:error, Snakepit.Error.t()}
Python method ParallelConfig.has_unfinished_dp.
Parameters
dp_group(term())has_unfinished(boolean())
Returns
boolean()
@spec is_moe_model(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec local_world_size(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec master_addr(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec master_port(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec max_parallel_loading_workers(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec new(term(), term(), term(), keyword()) :: {:ok, SnakeBridge.Ref.t()} | {:error, Snakepit.Error.t()}
Constructs ParallelConfig.
Parameters
dataclass_self__(term())args(term())kwargs(term())
@spec nnodes(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec nnodes_within_dp(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec node_rank(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec node_rank_within_dp(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec num_ubatches(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec pipeline_parallel_size(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec placement_group(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec prefill_context_parallel_size(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec rank(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec ray_runtime_env(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec ray_workers_use_nsight(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec sd_worker_cls(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec stateless_init_dp_group( SnakeBridge.Ref.t(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python method ParallelConfig.stateless_init_dp_group.
Returns
term()
@spec sync_kv_cache_memory_size(SnakeBridge.Ref.t(), term(), integer(), keyword()) :: {:ok, integer()} | {:error, Snakepit.Error.t()}
Python method ParallelConfig.sync_kv_cache_memory_size.
Parameters
dp_group(term())kv_cache_memory(integer())
Returns
integer()
@spec tensor_parallel_size(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec ubatch_size(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec use_ray(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec use_sequence_parallel_moe(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec use_ubatching(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec worker_cls(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec worker_extension_cls(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec world_size(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec world_size_across_dp(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}