Vllm.Distributed.StatelessProcessGroup (VLLM v0.3.0)

Copy Markdown View Source

A dataclass to hold a metadata store, and the rank, world_size of the

group. Only use it to communicate metadata between processes. For data-plane communication, create NCCL-related objects.

Summary

Functions

All gather an object from all ranks.

A robust barrier to synchronize all ranks.

Broadcast an object from a source rank to all other ranks.

A replacement for torch.distributed.init_process_group that does not

Expire data that is older than data_expiration_seconds seconds.

Initialize self. See help(type(self)) for accurate signature.

Receive an object from a source rank.

Send an object to a destination rank.

Types

t()

@opaque t()

Functions

all_gather_obj(ref, obj, opts \\ [])

@spec all_gather_obj(SnakeBridge.Ref.t(), term(), keyword()) ::
  {:ok, [term()]} | {:error, Snakepit.Error.t()}

All gather an object from all ranks.

Parameters

  • obj (term())

Returns

  • list(term())

barrier(ref, args, opts \\ [])

@spec barrier(SnakeBridge.Ref.t(), [term()], keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

A robust barrier to synchronize all ranks.

Uses a multi-phase approach to ensure all processes reach the barrier before proceeding:

  1. Each process signals it has reached the barrier

  2. Each process signals that it has confirmed the arrival of all other ranks.

  3. Rank 0 waits for all other ranks to signal their departure to ensure that all ranks have departed the barrier first.

Parameters

  • timeout - Maximum time in seconds to wait for each phase (in seconds)

Raises

Returns

  • term()

broadcast_obj(ref, obj, src, opts \\ [])

@spec broadcast_obj(SnakeBridge.Ref.t(), term(), integer(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Broadcast an object from a source rank to all other ranks.

It does not clean up after all ranks have received the object. Use it for limited times, e.g., for initialization.

Parameters

  • obj (term())
  • src (integer())

Returns

  • term()

broadcast_send_counter(ref)

@spec broadcast_send_counter(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

create(ref, host, port, rank, world_size, args, opts \\ [])

@spec create(
  SnakeBridge.Ref.t(),
  String.t(),
  integer(),
  integer(),
  integer(),
  [term()],
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

A replacement for torch.distributed.init_process_group that does not

pollute the global state.

If we have process A and process B called torch.distributed.init_process_group to form a group, and then we want to form another group with process A, B, C, D, it is not possible in PyTorch, because process A and process B have already formed a group, and process C and process D cannot join that group. This function is a workaround for this issue.

torch.distributed.init_process_group is a global call, while this function is a stateless call. It will return a StatelessProcessGroup object that can be used for exchanging metadata. With this function, process A and process B can call StatelessProcessGroup.create to form a group, and then process A, B, C, and D can call StatelessProcessGroup.create to form another group.

Parameters

  • host (String.t())
  • port (integer())
  • rank (integer())
  • world_size (integer())
  • data_expiration_seconds (integer() default: 3600)
  • store_timeout (integer() default: 300)

Returns

  • term()

data_expiration_seconds(ref)

@spec data_expiration_seconds(SnakeBridge.Ref.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

expire_data(ref, opts \\ [])

@spec expire_data(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Expire data that is older than data_expiration_seconds seconds.

Returns

  • term()

new(rank, world_size, store, socket, args, opts \\ [])

@spec new(integer(), integer(), term(), term(), [term()], keyword()) ::
  {:ok, SnakeBridge.Ref.t()} | {:error, Snakepit.Error.t()}

Initialize self. See help(type(self)) for accurate signature.

Parameters

  • rank (integer())
  • world_size (integer())
  • store (term())
  • socket (term())
  • data_expiration_seconds (integer() default: 3600)
  • send_dst_counter (%{optional(integer()) => integer()} default: <factory>)
  • recv_src_counter (%{optional(integer()) => integer()} default: <factory>)
  • broadcast_send_counter (integer() default: 0)
  • broadcast_recv_src_counter (%{optional(integer()) => integer()} default: <factory>)
  • entries (term() default: <factory>)

recv_obj(ref, src, opts \\ [])

@spec recv_obj(SnakeBridge.Ref.t(), integer(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Receive an object from a source rank.

Parameters

  • src (integer())

Returns

  • term()

send_obj(ref, obj, dst, opts \\ [])

@spec send_obj(SnakeBridge.Ref.t(), term(), integer(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Send an object to a destination rank.

Parameters

  • obj (term())
  • dst (integer())

Returns

  • term()