Submodule bindings for vllm.distributed.
Version
- Requested: 0.14.0
- Observed at generation: 0.14.0
Runtime Options
All functions accept a __runtime__ option for controlling execution behavior:
Vllm.Distributed.some_function(args, __runtime__: [timeout: 120_000])Supported runtime options
:timeout- Call timeout in milliseconds (default: 120,000ms / 2 minutes):timeout_profile- Use a named profile (:default,:ml_inference,:batch_job,:streaming):stream_timeout- Timeout for streaming operations (default: 1,800,000ms / 30 minutes):session_id- Override the session ID for this call:pool_name- Target a specific Snakepit pool (multi-pool setups):affinity- Override session affinity (:hint,:strict_queue,:strict_fail_fast)
Timeout Profiles
:default- 2 minute timeout for regular calls:ml_inference- 10 minute timeout for ML/LLM workloads:batch_job- Unlimited timeout for long-running jobs:streaming- 2 minute timeout, 30 minute stream_timeout
Example with timeout override
# For a long-running ML inference call
Vllm.Distributed.predict(data, __runtime__: [timeout_profile: :ml_inference])
# Or explicit timeout
Vllm.Distributed.predict(data, __runtime__: [timeout: 600_000])
# Route to a pool and enforce strict affinity
Vllm.Distributed.predict(data, __runtime__: [pool_name: :strict_pool, affinity: :strict_queue])See SnakeBridge.Defaults for global timeout configuration.
Summary
Functions
Python binding for vllm.distributed.all_gather.
Python binding for vllm.distributed.all_gather_fake.
Python binding for vllm.distributed.all_reduce.
Python binding for vllm.distributed.all_reduce_fake.
Python binding for vllm.distributed.broadcast_tensor_dict.
Python binding for vllm.distributed.cleanup_dist_env_and_memory.
Python binding for vllm.distributed.destroy_distributed_environment.
Set the groups to none and destroy them.
torch.library.custom_op can have significant overhead because it
Ensure that numerator is divisible by the denominator and return
Ensure that numerator is divisible by the denominator.
Helper to initialize model parallel groups if they are not initialized,
Python binding for vllm.distributed.get_context_model_parallel_group.
Python binding for vllm.distributed.get_dcp_group.
Return my rank for the decode context model parallel group.
Return world size for the decode context model parallel group.
Python binding for vllm.distributed.get_distributed_init_method.
Python binding for vllm.distributed.get_dp_group.
Python binding for vllm.distributed.get_ep_group.
Python binding for vllm.distributed.get_inner_dp_world_group.
Return the total number of nodes in the distributed environment.
Python binding for vllm.distributed.get_pcp_group.
Python binding for vllm.distributed.get_pp_group.
Try to evenly distribute layers across partitions.
Python binding for vllm.distributed.get_tcp_uri.
Return my rank for the tensor model parallel group.
Return world size for the tensor model parallel group.
Python binding for vllm.distributed.get_tp_group.
Python binding for vllm.distributed.get_world_group.
graph_capture is a context manager which should surround the code that
This is a collective operation that returns if each rank is in the same node
Python binding for vllm.distributed.init_distributed_environment.
Stateless init ProcessGroup with gloo backend compatible with
The main purpose of this function is to ensure that loggers are
Python binding for vllm.distributed.init_model_parallel_group.
Python binding for vllm.distributed.init_world_group.
Initialize model parallel groups.
Check if the current process is the first rank globally across all
Check if the current process is the first local rank (rank 0 on its node).
Check if the installed torch version is >= the target version.
Python module attribute vllm.distributed.logger.
Check if tensor and pipeline parallel groups are initialized.
Patch the tp group temporarily until this function ends.
Python binding for vllm.distributed.patched_fused_scaled_matmul_reduce_scatter.
Python binding for vllm.distributed.patched_fused_scaled_matmul_reduce_scatter_fake.
Prepare the communication buffer for the model.
Python binding for vllm.distributed.reduce_scatter.
Python binding for vllm.distributed.reduce_scatter_fake.
Resolve an object by its fully-qualified class name.
Python binding for vllm.distributed.sched_yield.
Python binding for vllm.distributed.set_custom_all_reduce.
Split a tensor along its last dimension.
Destroy ProcessGroup returned by
A replacement for torch.distributed.init_process_group that does not
Suppress stdout from C libraries at the file descriptor level.
All-gather the input tensor across model parallel group.
All-reduce the input tensor across model parallel group.
Gather the input tensor across model parallel group.
Reduce-Scatter the input tensor across model parallel group.
Python module attribute vllm.distributed.USE_SCHED_YIELD.
Functions
@spec all_gather(term(), integer(), integer(), String.t(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.all_gather.
Parameters
tensor(term())dim(integer())world_size(integer())group_name(String.t())
Returns
term()
@spec all_gather_fake(term(), integer(), integer(), String.t(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.all_gather_fake.
Parameters
tensor(term())dim(integer())world_size(integer())group_name(String.t())
Returns
term()
@spec all_reduce(term(), String.t(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.all_reduce.
Parameters
tensor(term())group_name(String.t())
Returns
term()
@spec all_reduce_fake(term(), String.t(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.all_reduce_fake.
Parameters
tensor(term())group_name(String.t())
Returns
term()
@spec broadcast_tensor_dict() :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.broadcast_tensor_dict.
Parameters
tensor_dict(term() default: None)src(integer() default: 0)
Returns
term()
@spec broadcast_tensor_dict(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec broadcast_tensor_dict(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec broadcast_tensor_dict( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec broadcast_tensor_dict(term(), integer()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec broadcast_tensor_dict(term(), integer(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec cleanup_dist_env_and_memory() :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.cleanup_dist_env_and_memory.
Parameters
shutdown_ray(boolean() default: False)
Returns
term()
@spec cleanup_dist_env_and_memory(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec cleanup_dist_env_and_memory(boolean()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec cleanup_dist_env_and_memory( boolean(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec destroy_distributed_environment(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.destroy_distributed_environment.
Returns
term()
@spec destroy_model_parallel(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Set the groups to none and destroy them.
Returns
term()
@spec direct_register_custom_op(String.t(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
torch.library.custom_op can have significant overhead because it
needs to consider complicated dispatching logic. This function directly registers a custom op and dispatches it to the CUDA backend. See https://gist.github.com/youkaichao/ecbea9ec9fc79a45d2adce1784d7a9a5 for more details.
By default, the custom op is registered to the vLLM library. If you
want to register it to a different library, you can pass the library
object to the target_lib argument.
IMPORTANT: the lifetime of the operator is tied to the lifetime of the library object. If you want to bind the operator to a different library, make sure the library object is alive when the operator is used.
Parameters
op_name(String.t())op_func(term())mutates_args(term() default: None)fake_impl(term() default: None)target_lib(term() default: None)dispatch_key(term() default: None)tags({term(), term()} default: ())
Returns
term()
@spec direct_register_custom_op(String.t(), term(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec direct_register_custom_op(String.t(), term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec divide(term(), term(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Ensure that numerator is divisible by the denominator and return
the division value.
Parameters
numerator(term())denominator(term())
Returns
term()
@spec ensure_divisibility(term(), term(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Ensure that numerator is divisible by the denominator.
Parameters
numerator(term())denominator(term())
Returns
term()
@spec ensure_model_parallel_initialized(integer(), integer()) :: {:ok, nil} | {:error, Snakepit.Error.t()}
Helper to initialize model parallel groups if they are not initialized,
or ensure tensor-parallel and pipeline-parallel sizes are equal to expected values if the model parallel groups are initialized.
Parameters
tensor_model_parallel_size(integer())pipeline_model_parallel_size(integer())prefill_context_model_parallel_size(integer() default: 1)decode_context_model_parallel_size(term() default: 1)backend(term() default: None)
Returns
nil
@spec ensure_model_parallel_initialized(integer(), integer(), keyword()) :: {:ok, nil} | {:error, Snakepit.Error.t()}
@spec ensure_model_parallel_initialized(integer(), integer(), integer()) :: {:ok, nil} | {:error, Snakepit.Error.t()}
@spec ensure_model_parallel_initialized(integer(), integer(), integer(), keyword()) :: {:ok, nil} | {:error, Snakepit.Error.t()}
@spec ensure_model_parallel_initialized(integer(), integer(), integer(), term()) :: {:ok, nil} | {:error, Snakepit.Error.t()}
@spec get_context_model_parallel_group(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.get_context_model_parallel_group.
Returns
term()
@spec get_dcp_group(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.get_dcp_group.
Returns
term()
@spec get_decode_context_model_parallel_rank(keyword()) :: {:ok, integer()} | {:error, Snakepit.Error.t()}
Return my rank for the decode context model parallel group.
Returns
integer()
@spec get_decode_context_model_parallel_world_size(keyword()) :: {:ok, integer()} | {:error, Snakepit.Error.t()}
Return world size for the decode context model parallel group.
Returns
integer()
@spec get_distributed_init_method(String.t(), integer(), keyword()) :: {:ok, String.t()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.get_distributed_init_method.
Parameters
ip(String.t())port(integer())
Returns
String.t()
@spec get_dp_group(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.get_dp_group.
Returns
term()
@spec get_ep_group(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.get_ep_group.
Returns
term()
@spec get_inner_dp_world_group(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.get_inner_dp_world_group.
Returns
term()
@spec get_node_count(keyword()) :: {:ok, integer()} | {:error, Snakepit.Error.t()}
Return the total number of nodes in the distributed environment.
Returns
integer()
@spec get_pcp_group(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.get_pcp_group.
Returns
term()
@spec get_pp_group(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.get_pp_group.
Returns
term()
@spec get_pp_indices(integer(), integer(), integer(), keyword()) :: {:ok, {integer(), integer()}} | {:error, Snakepit.Error.t()}
Try to evenly distribute layers across partitions.
If the number of layers is not divisible by the number of partitions, the remaining layers are evenly distributed across all but the last partition. The last partition is excluded because it often contains an additional norm layer and we are attempting to balance compute.
If pp_size > 2 and the number of remaining layers is
0 < x <= pp_size - 2 then the remaining layers are evenly distributed
across the middle partitions. The first and last partitions are excluded
because they contain the input and output embeddings respectively and we
are attempting to reduce maximum memory consumption across partitions.
Parameters
num_hidden_layers(integer())pp_rank(integer())pp_size(integer())
Returns
{integer(), integer()}
@spec get_tcp_uri(String.t(), integer(), keyword()) :: {:ok, String.t()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.get_tcp_uri.
Parameters
ip(String.t())port(integer())
Returns
String.t()
@spec get_tensor_model_parallel_rank(keyword()) :: {:ok, integer()} | {:error, Snakepit.Error.t()}
Return my rank for the tensor model parallel group.
Returns
integer()
@spec get_tensor_model_parallel_world_size(keyword()) :: {:ok, integer()} | {:error, Snakepit.Error.t()}
Return world size for the tensor model parallel group.
Returns
integer()
@spec get_tp_group(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.get_tp_group.
Returns
term()
@spec get_world_group(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.get_world_group.
Returns
term()
@spec graph_capture( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
graph_capture is a context manager which should surround the code that
is capturing the CUDA graph. Its main purpose is to ensure that some
operations will be run after the graph is captured, before the graph
is replayed. It returns a GraphCaptureContext object which contains the
necessary data for the graph capture. Currently, it only contains the
stream that the graph capture is running on. This stream is set to the
current CUDA stream when the context manager is entered and reset to the
default stream when the context manager is exited. This is to ensure that
the graph capture is running on a separate stream from the default stream,
in order to explicitly distinguish the kernels to capture
from other kernels possibly launched on background in the default stream.
Parameters
device(term())
Returns
term()
@spec in_the_same_node_as(term()) :: {:ok, [boolean()]} | {:error, Snakepit.Error.t()}
This is a collective operation that returns if each rank is in the same node
as the source rank. It tests if processes are attached to the same memory system (shared access to shared memory).
Parameters
pg(term())source_rank(integer() default: 0)
Returns
list(boolean())
@spec in_the_same_node_as( term(), keyword() ) :: {:ok, [boolean()]} | {:error, Snakepit.Error.t()}
@spec in_the_same_node_as(term(), integer()) :: {:ok, [boolean()]} | {:error, Snakepit.Error.t()}
@spec in_the_same_node_as(term(), integer(), keyword()) :: {:ok, [boolean()]} | {:error, Snakepit.Error.t()}
@spec init_distributed_environment() :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.init_distributed_environment.
Parameters
world_size(integer() default: -1)rank(integer() default: -1)distributed_init_method(String.t() default: 'env://')local_rank(integer() default: -1)backend(String.t() default: 'nccl')timeout(term() default: None)
Returns
term()
@spec init_distributed_environment(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec init_distributed_environment(integer()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec init_distributed_environment( integer(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec init_distributed_environment(integer(), integer()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec init_distributed_environment(integer(), integer(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec init_distributed_environment(integer(), integer(), String.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec init_distributed_environment( integer(), integer(), String.t(), integer(), String.t(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec init_distributed_environment( integer(), integer(), String.t(), integer(), String.t(), term() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec init_gloo_process_group(term(), integer(), integer(), term(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Stateless init ProcessGroup with gloo backend compatible with
different torch versions.
Parameters
prefix_store(term())group_rank(integer())group_size(integer())timeout(term())
Returns
term()
@spec init_logger( String.t(), keyword() ) :: {:ok, Vllm.Logger.VllmLogger.t()} | {:error, Snakepit.Error.t()}
The main purpose of this function is to ensure that loggers are
retrieved in such a way that we can be sure the root vllm logger has already been configured.
Parameters
name(String.t())
Returns
Vllm.Logger.VllmLogger.t()
@spec init_model_parallel_group([[integer()]], integer(), String.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.init_model_parallel_group.
Parameters
group_ranks(list(list(integer())))local_rank(integer())backend(String.t())use_message_queue_broadcaster(boolean() default: False)group_name(term() default: None)use_device_communicator(boolean() default: True)
Returns
term()
@spec init_world_group([integer()], integer(), String.t(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.init_world_group.
Parameters
ranks(list(integer()))local_rank(integer())backend(String.t())
Returns
term()
@spec initialize_model_parallel() :: {:ok, nil} | {:error, Snakepit.Error.t()}
Initialize model parallel groups.
Parameters
tensor_model_parallel_size- number of GPUs used for tensor model parallelism.pipeline_model_parallel_size- number of GPUs used for pipeline model parallelism.backend- name of torch distributed communication backend.
Returns
nil
@spec initialize_model_parallel(keyword()) :: {:ok, nil} | {:error, Snakepit.Error.t()}
@spec initialize_model_parallel(integer()) :: {:ok, nil} | {:error, Snakepit.Error.t()}
@spec initialize_model_parallel( integer(), keyword() ) :: {:ok, nil} | {:error, Snakepit.Error.t()}
@spec initialize_model_parallel(integer(), integer()) :: {:ok, nil} | {:error, Snakepit.Error.t()}
@spec initialize_model_parallel(integer(), integer(), keyword()) :: {:ok, nil} | {:error, Snakepit.Error.t()}
@spec initialize_model_parallel(integer(), integer(), integer()) :: {:ok, nil} | {:error, Snakepit.Error.t()}
@spec initialize_model_parallel(integer(), integer(), integer(), keyword()) :: {:ok, nil} | {:error, Snakepit.Error.t()}
@spec initialize_model_parallel(integer(), integer(), integer(), term()) :: {:ok, nil} | {:error, Snakepit.Error.t()}
@spec is_global_first_rank(keyword()) :: {:ok, boolean()} | {:error, Snakepit.Error.t()}
Check if the current process is the first rank globally across all
parallelism strategies (PP, TP, DP, EP, etc.).
Unlike group-specific checks like get_tensor_model_parallel_rank() == 0
or get_pp_group().is_first_rank, this function checks the global rank
across all parallelism dimensions.
Returns
Returns boolean(). True if this is the global first rank (rank 0), False otherwise. Returns True if distributed is not initialized (single process).
@spec is_local_first_rank(keyword()) :: {:ok, boolean()} | {:error, Snakepit.Error.t()}
Check if the current process is the first local rank (rank 0 on its node).
Returns
boolean()
@spec is_torch_equal_or_newer( String.t(), keyword() ) :: {:ok, boolean()} | {:error, Snakepit.Error.t()}
Check if the installed torch version is >= the target version.
Parameters
target- a version string, like "2.6.0".
Returns
boolean()
@spec logger() :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python module attribute vllm.distributed.logger.
Returns
term()
@spec model_parallel_is_initialized(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Check if tensor and pipeline parallel groups are initialized.
Returns
term()
@spec patch_tensor_parallel_group( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Patch the tp group temporarily until this function ends.
This method is for draft workers of speculative decoding to run draft model with different tp degree from that of target model workers.
Parameters
tp_group- the tp group coordinator (type:GroupCoordinator)
Returns
term()
@spec patched_fused_scaled_matmul_reduce_scatter( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()] ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.patched_fused_scaled_matmul_reduce_scatter.
Parameters
a(term())b(term())a_scale(term())b_scale(term())reduce_op(String.t())orig_scatter_dim(integer())scatter_dim_after_maybe_reshape(integer())group_name(String.t())output_shape(list(integer()))bias(term() default: None)result_scale(term() default: None)out_dtype(term() default: None)use_fast_accum(boolean() default: False)
Returns
term()
@spec patched_fused_scaled_matmul_reduce_scatter( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], term() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], term(), term() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], term(), term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], term(), term(), term() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], term(), term(), term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], term(), term(), term(), boolean() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter_fake( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()] ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.patched_fused_scaled_matmul_reduce_scatter_fake.
Parameters
a(term())b(term())a_scale(term())b_scale(term())reduce_op(String.t())orig_scatter_dim(integer())scatter_dim_after_maybe_reshape(integer())group_name(String.t())output_shape(list(integer()))bias(term() default: None)result_scale(term() default: None)out_dtype(term() default: None)use_fast_accum(boolean() default: False)
Returns
term()
@spec patched_fused_scaled_matmul_reduce_scatter_fake( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter_fake( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], term() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter_fake( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter_fake( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], term(), term() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter_fake( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], term(), term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter_fake( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], term(), term(), term() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter_fake( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], term(), term(), term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec patched_fused_scaled_matmul_reduce_scatter_fake( term(), term(), term(), term(), String.t(), integer(), integer(), String.t(), [integer()], term(), term(), term(), boolean() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec prepare_communication_buffer_for_model( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Prepare the communication buffer for the model.
Traditional communication libraries like NCCL are almost model agnostic. However, emerging new communication libraries like MoE all2all (DeepEP) usually allocate the communication buffer based on the model shape for optimal performance.
Parameters
model(term())
Returns
term()
@spec reduce_scatter(term(), integer(), integer(), String.t(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.reduce_scatter.
Parameters
tensor(term())dim(integer())world_size(integer())group_name(String.t())
Returns
term()
@spec reduce_scatter_fake(term(), integer(), integer(), String.t(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.reduce_scatter_fake.
Parameters
tensor(term())dim(integer())world_size(integer())group_name(String.t())
Returns
term()
@spec resolve_obj_by_qualname( String.t(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Resolve an object by its fully-qualified class name.
Parameters
qualname(String.t())
Returns
term()
@spec sched_yield(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.sched_yield.
Returns
term()
@spec set_custom_all_reduce( boolean(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for vllm.distributed.set_custom_all_reduce.
Parameters
enable(boolean())
Returns
term()
@spec split_tensor_along_last_dim(term(), integer()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Split a tensor along its last dimension.
Parameters
tensor- input tensor.num_partitions- number of partitions to split the tensorcontiguous_split_chunks- If True, make each chunk contiguous in memory.
Returns
term()
@spec split_tensor_along_last_dim(term(), integer(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec split_tensor_along_last_dim(term(), integer(), boolean()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec stateless_destroy_torch_distributed_process_group( term(), keyword() ) :: {:ok, nil} | {:error, Snakepit.Error.t()}
Destroy ProcessGroup returned by
stateless_init_torch_distributed_process_group().
Parameters
pg(term())
Returns
nil
@spec stateless_init_torch_distributed_process_group( String.t(), integer(), integer(), integer(), String.t(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
A replacement for torch.distributed.init_process_group that does not
pollute the global state. The created ProcessGroup object can be used for
some operations such as allreduce, because it does not depend on the
global rank. However, some operations such as broadcast cannot be used
because it depends on the global rank.
TODO: ask for help from PyTorch team if we need the broadcast operation.
This function is useful when we are not sure about the total number of processes in the process group. For example, we may have process 1, 2, ..., 8 who want to communicate, and process 9 might be the same process as process 1, or it might be a different process; process 10 might be the same process as process 5, or it might be a different process. In this case, how can we reliably form a communication channel within process 9 and 10, without affecting the communication channel within process 1, 2, ..., 8?
One possible solution is to figure out if process 9 and 10 are the same as process 1 and 5 beforehand, and then form a communication channel based on the information, adjusting the ranks and world_size etc. However, figuring out the information is not always easy, and it will interfere with the main communication channel.
Our solution is to always form a communication channel with process 1, 2, ..., 8, and then use this function to form another communication channel with process 9 and 10. This way, regardless of whether process 9 and 10 are the same as process 1 and 5, the main communication channel is always formed with process 1, 2, ..., 8, and the additional communication channel is formed with process 9 and 10.
Parameters
host(String.t())port(integer())rank(integer())world_size(integer())backend(String.t())
Returns
term()
@spec suppress_stdout(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Suppress stdout from C libraries at the file descriptor level.
Only suppresses stdout, not stderr, to preserve error messages. Suppression is disabled when VLLM_LOGGING_LEVEL is set to DEBUG.
Examples
with suppress_stdout(): # C library calls that would normally print to stdout torch.distributed.new_group(ranks, backend="gloo")
Returns
term()
@spec tensor_model_parallel_all_gather(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
All-gather the input tensor across model parallel group.
Parameters
input_(term())dim(integer() default: -1)
Returns
term()
@spec tensor_model_parallel_all_gather( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec tensor_model_parallel_all_gather(term(), integer()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec tensor_model_parallel_all_gather(term(), integer(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec tensor_model_parallel_all_reduce( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
All-reduce the input tensor across model parallel group.
Parameters
input_(term())
Returns
term()
@spec tensor_model_parallel_gather(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Gather the input tensor across model parallel group.
Parameters
input_(term())dst(integer() default: 0)dim(integer() default: -1)
Returns
term()
@spec tensor_model_parallel_gather( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec tensor_model_parallel_gather(term(), integer()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec tensor_model_parallel_gather(term(), integer(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec tensor_model_parallel_gather(term(), integer(), integer()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec tensor_model_parallel_reduce_scatter(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Reduce-Scatter the input tensor across model parallel group.
Parameters
input_(term())dim(integer() default: -1)
Returns
term()
@spec tensor_model_parallel_reduce_scatter( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec tensor_model_parallel_reduce_scatter(term(), integer()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec tensor_model_parallel_reduce_scatter(term(), integer(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec use_sched_yield() :: {:ok, boolean()} | {:error, Snakepit.Error.t()}
Python module attribute vllm.distributed.USE_SCHED_YIELD.
Returns
boolean()