Vllm.Config.ParallelConfig (VLLM v0.3.0)

Copy Markdown View Source

Configuration for the distributed execution.

Summary

Functions

Skip validation if the value is None when initialisation is delayed.

Python method ParallelConfig._validate_parallel_config.

Python method ParallelConfig._verify_args.

Provide a hash that uniquely identifies all the configs

We might need to initialize process groups in multiple

Python method ParallelConfig.has_unfinished_dp.

Python method ParallelConfig.stateless_init_dp_group.

Python method ParallelConfig.sync_kv_cache_memory_size.

Types

t()

@opaque t()

Functions

_skip_none_validation(ref, value, handler, opts \\ [])

@spec _skip_none_validation(SnakeBridge.Ref.t(), term(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Skip validation if the value is None when initialisation is delayed.

Parameters

  • value (term())
  • handler (term())

Returns

  • term()

_validate_parallel_config(ref, opts \\ [])

@spec _validate_parallel_config(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python method ParallelConfig._validate_parallel_config.

Returns

  • term()

_verify_args(ref, opts \\ [])

@spec _verify_args(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python method ParallelConfig._verify_args.

Returns

  • term()

all2all_backend(ref)

@spec all2all_backend(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

compute_hash(ref, opts \\ [])

@spec compute_hash(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Provide a hash that uniquely identifies all the configs

that affect the structure of the computation graph from input ids/embeddings to the final hidden states, excluding anything before input ids/embeddings and after the final hidden states.

This hash is also used for DP worker configuration validation to prevent hangs from mismatched collective communication patterns.

Returns

  • term()

cp_kv_cache_interleave_size(ref)

@spec cp_kv_cache_interleave_size(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

data_parallel_backend(ref)

@spec data_parallel_backend(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

data_parallel_external_lb(ref)

@spec data_parallel_external_lb(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

data_parallel_hybrid_lb(ref)

@spec data_parallel_hybrid_lb(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

data_parallel_index(ref)

@spec data_parallel_index(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

data_parallel_master_ip(ref)

@spec data_parallel_master_ip(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

data_parallel_master_port(ref)

@spec data_parallel_master_port(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

data_parallel_rank(ref)

@spec data_parallel_rank(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

data_parallel_rank_local(ref)

@spec data_parallel_rank_local(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

data_parallel_rpc_port(ref)

@spec data_parallel_rpc_port(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

data_parallel_size(ref)

@spec data_parallel_size(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

data_parallel_size_local(ref)

@spec data_parallel_size_local(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

dbo_decode_token_threshold(ref)

@spec dbo_decode_token_threshold(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

dbo_prefill_token_threshold(ref)

@spec dbo_prefill_token_threshold(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

dcp_kv_cache_interleave_size(ref)

@spec dcp_kv_cache_interleave_size(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

decode_context_parallel_size(ref)

@spec decode_context_parallel_size(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

disable_custom_all_reduce(ref)

@spec disable_custom_all_reduce(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

disable_nccl_for_dp_synchronization(ref)

@spec disable_nccl_for_dp_synchronization(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

distributed_executor_backend(ref)

@spec distributed_executor_backend(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

enable_dbo(ref)

@spec enable_dbo(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

enable_eplb(ref)

@spec enable_eplb(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

enable_expert_parallel(ref)

@spec enable_expert_parallel(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

eplb_config(ref)

@spec eplb_config(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

expert_placement_strategy(ref)

@spec expert_placement_strategy(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

get_next_dp_init_port(ref, opts \\ [])

@spec get_next_dp_init_port(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, integer()} | {:error, Snakepit.Error.t()}

We might need to initialize process groups in multiple

processes that is related to data parallelism, e.g. both in the worker and in the engine, which can live in different processes. To avoid port conflicts, we pop a new port from the prepared port list each time we need to initialize a new process group related to data parallelism.

Returns

  • integer()

has_unfinished_dp(ref, dp_group, has_unfinished, opts \\ [])

@spec has_unfinished_dp(SnakeBridge.Ref.t(), term(), boolean(), keyword()) ::
  {:ok, boolean()} | {:error, Snakepit.Error.t()}

Python method ParallelConfig.has_unfinished_dp.

Parameters

  • dp_group (term())
  • has_unfinished (boolean())

Returns

  • boolean()

is_moe_model(ref)

@spec is_moe_model(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

local_world_size(ref)

@spec local_world_size(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

master_addr(ref)

@spec master_addr(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

master_port(ref)

@spec master_port(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

max_parallel_loading_workers(ref)

@spec max_parallel_loading_workers(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

new(dataclass_self__, args, kwargs, opts \\ [])

@spec new(term(), term(), term(), keyword()) ::
  {:ok, SnakeBridge.Ref.t()} | {:error, Snakepit.Error.t()}

Constructs ParallelConfig.

Parameters

  • dataclass_self__ (term())
  • args (term())
  • kwargs (term())

nnodes(ref)

@spec nnodes(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

nnodes_within_dp(ref)

@spec nnodes_within_dp(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

node_rank(ref)

@spec node_rank(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

node_rank_within_dp(ref)

@spec node_rank_within_dp(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

num_ubatches(ref)

@spec num_ubatches(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

pipeline_parallel_size(ref)

@spec pipeline_parallel_size(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

placement_group(ref)

@spec placement_group(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

prefill_context_parallel_size(ref)

@spec prefill_context_parallel_size(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

rank(ref)

@spec rank(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

ray_runtime_env(ref)

@spec ray_runtime_env(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

ray_workers_use_nsight(ref)

@spec ray_workers_use_nsight(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

sd_worker_cls(ref)

@spec sd_worker_cls(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

stateless_init_dp_group(ref, opts \\ [])

@spec stateless_init_dp_group(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python method ParallelConfig.stateless_init_dp_group.

Returns

  • term()

sync_kv_cache_memory_size(ref, dp_group, kv_cache_memory, opts \\ [])

@spec sync_kv_cache_memory_size(SnakeBridge.Ref.t(), term(), integer(), keyword()) ::
  {:ok, integer()} | {:error, Snakepit.Error.t()}

Python method ParallelConfig.sync_kv_cache_memory_size.

Parameters

  • dp_group (term())
  • kv_cache_memory (integer())

Returns

  • integer()

tensor_parallel_size(ref)

@spec tensor_parallel_size(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

ubatch_size(ref)

@spec ubatch_size(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

use_ray(ref)

@spec use_ray(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

use_sequence_parallel_moe(ref)

@spec use_sequence_parallel_moe(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

use_ubatching(ref)

@spec use_ubatching(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

worker_cls(ref)

@spec worker_cls(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

worker_extension_cls(ref)

@spec worker_extension_cls(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

world_size(ref)

@spec world_size(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

world_size_across_dp(ref)

@spec world_size_across_dp(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}