A dataclass to hold a metadata store, and the rank, world_size of the
group. Only use it to communicate metadata between processes. For data-plane communication, create NCCL-related objects.
Summary
Functions
All gather an object from all ranks.
A robust barrier to synchronize all ranks.
Broadcast an object from a source rank to all other ranks.
A replacement for torch.distributed.init_process_group that does not
Expire data that is older than data_expiration_seconds seconds.
Initialize self. See help(type(self)) for accurate signature.
Receive an object from a source rank.
Send an object to a destination rank.
Types
Functions
@spec all_gather_obj(SnakeBridge.Ref.t(), term(), keyword()) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}
All gather an object from all ranks.
Parameters
obj(term())
Returns
list(term())
@spec barrier(SnakeBridge.Ref.t(), [term()], keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
A robust barrier to synchronize all ranks.
Uses a multi-phase approach to ensure all processes reach the barrier before proceeding:
Each process signals it has reached the barrier
Each process signals that it has confirmed the arrival of all other ranks.
Rank 0 waits for all other ranks to signal their departure to ensure that all ranks have departed the barrier first.
Parameters
timeout- Maximum time in seconds to wait for each phase (in seconds)
Raises
RuntimeError- If coordination fails or times out
Returns
term()
@spec broadcast_obj(SnakeBridge.Ref.t(), term(), integer(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Broadcast an object from a source rank to all other ranks.
It does not clean up after all ranks have received the object. Use it for limited times, e.g., for initialization.
Parameters
obj(term())src(integer())
Returns
term()
@spec broadcast_send_counter(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec create( SnakeBridge.Ref.t(), String.t(), integer(), integer(), integer(), [term()], keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
A replacement for torch.distributed.init_process_group that does not
pollute the global state.
If we have process A and process B called torch.distributed.init_process_group
to form a group, and then we want to form another group with process A, B, C,
D, it is not possible in PyTorch, because process A and process B have already
formed a group, and process C and process D cannot join that group. This
function is a workaround for this issue.
torch.distributed.init_process_group is a global call, while this function
is a stateless call. It will return a StatelessProcessGroup object that can be
used for exchanging metadata. With this function, process A and process B
can call StatelessProcessGroup.create to form a group, and then process A, B,
C, and D can call StatelessProcessGroup.create to form another group.
Parameters
host(String.t())port(integer())rank(integer())world_size(integer())data_expiration_seconds(integer() default: 3600)store_timeout(integer() default: 300)
Returns
term()
@spec data_expiration_seconds(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec expire_data( SnakeBridge.Ref.t(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Expire data that is older than data_expiration_seconds seconds.
Returns
term()
@spec new(integer(), integer(), term(), term(), [term()], keyword()) :: {:ok, SnakeBridge.Ref.t()} | {:error, Snakepit.Error.t()}
Initialize self. See help(type(self)) for accurate signature.
Parameters
rank(integer())world_size(integer())store(term())socket(term())data_expiration_seconds(integer() default: 3600)send_dst_counter(%{optional(integer()) => integer()} default: <factory>)recv_src_counter(%{optional(integer()) => integer()} default: <factory>)broadcast_send_counter(integer() default: 0)broadcast_recv_src_counter(%{optional(integer()) => integer()} default: <factory>)entries(term() default: <factory>)
@spec recv_obj(SnakeBridge.Ref.t(), integer(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Receive an object from a source rank.
Parameters
src(integer())
Returns
term()
@spec send_obj(SnakeBridge.Ref.t(), term(), integer(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Send an object to a destination rank.
Parameters
obj(term())dst(integer())
Returns
term()