OmniSafe Distributed#

setup_distributed()

Avoid slowdowns caused by each separate process's PyTorch, using more than its fair share of CPU resources.

get_rank()

Get rank of calling process.

is_master()

Test whether the process is the root process.

world_size()

Count active MPI processes.

fork(parallel[, bind_to_core, ...])

The entrance of multi-processing.

avg_tensor(value)

Average a torch tensor over MPI processes.

avg_grads(module)

Average contents of gradient buffers across MPI processes.

sync_params(module)

Sync all parameters of module across all MPI processes.

avg_params(module)

Average contents of all parameters across MPI processes.

dist_avg(value)

Average a tensor over distributed processes.

dist_sum(value)

Sum a tensor over distributed processes.

dist_max(value)

Determine global maximum of tensor over distributed processes.

dist_min(value)

Determine global minimum of tensor over distributed processes.

dist_op(value, operation)

Multi-processing operation.

dist_statistics_scalar(value[, with_min_and_max])

Get mean/std and optional min/max of scalar x across MPI processes.

Set up distributed training#

Documentation

omnisafe.utils.distributed.setup_distributed()[source]#

Avoid slowdowns caused by each separate process’s PyTorch, using more than its fair share of CPU resources.

Return type:

None

omnisafe.utils.distributed.get_rank()[source]#

Get rank of calling process.

Return type:

int

Example

>>> # In process 0
>>> get_rank()
0
omnisafe.utils.distributed.is_master()[source]#

Test whether the process is the root process.

Return type:

bool

omnisafe.utils.distributed.world_size()[source]#

Count active MPI processes.

Return type:

int

omnisafe.utils.distributed.fork(parallel, bind_to_core=False, use_number_of_threads=False, device='cpu', manual_args=None)[source]#

The entrance of multi-processing.

Re-launches the current script with workers linked by MPI. Also, terminates the original process that launched it. Taken almost without modification from the Baselines function of the same name.

Note

Usage: if mpi_fork(n) : sys.exit()

Parameters:
  • parallel (int) – number of processes to launch.

  • bind_to_core (bool, optional) – Defaults to False.

  • use_number_of_threads (bool, optional) – Defaults to False.

Return type:

bool

Tensor Operations#

Documentation

omnisafe.utils.distributed.avg_tensor(value)[source]#

Average a torch tensor over MPI processes.

Since torch and numpy share same memory space, tensors of dim > 0 can be be manipulated through call by reference, scalars must be assigned.

Example

>>> # In process 0
>>> x = torch.tensor(1.0)
>>> # In process 1
>>> x = torch.tensor(2.0)
>>> avg_tensor(x)
>>> x
tensor(1.5)
Parameters:

value (torch.Tensor) – value to be averaged.

Return type:

None

omnisafe.utils.distributed.avg_grads(module)[source]#

Average contents of gradient buffers across MPI processes.

Note

This function only works when the training is multi-processing.

Example

>>> # In process 0
>>> x = torch.tensor(1.0, requires_grad=True)
>>> y = x ** 2
>>> y.backward()
>>> x.grad
tensor(2.)
>>> # In process 1
>>> x = torch.tensor(2.0, requires_grad=True)
>>> y = x ** 2
>>> y.backward()
>>> x.grad
tensor(4.)
>>> avg_grads(x)
>>> x.grad
tensor(3.)
Parameters:

module (torch.nn.Module) – module to be averaged.

Return type:

None

omnisafe.utils.distributed.sync_params(module)[source]#

Sync all parameters of module across all MPI processes.

Note

This function only works when the training is multi-processing.

Example

>>> # In process 0
>>> model = torch.nn.Linear(1, 1)
>>> model.weight.data = torch.tensor([[1.]])
>>> model.weight.data
tensor([[1.]])
>>> # In process 1
>>> model = torch.nn.Linear(1, 1)
>>> model.weight.data = torch.tensor([[2.]])
>>> model.weight.data
tensor([[2.]])
>>> sync_params(model)
>>> model.weight.data
tensor([[1.]])
Parameters:

module (torch.nn.Module) – module to be synchronized.

Return type:

None

omnisafe.utils.distributed.avg_params(module)[source]#

Average contents of all parameters across MPI processes.

Example

>>> # In process 0
>>> model = torch.nn.Linear(1, 1)
>>> model.weight.data = torch.tensor([[1.]])
>>> model.weight.data
tensor([[1.]])
>>> # In process 1
>>> model = torch.nn.Linear(1, 1)
>>> model.weight.data = torch.tensor([[2.]])
>>> model.weight.data
tensor([[2.]])
>>> avg_params(model)
>>> model.weight.data
tensor([[1.5]])
Parameters:

module (torch.nn.Module) – module to be averaged.

Return type:

None

Distributed Operations#

Documentation

omnisafe.utils.distributed.dist_avg(value)[source]#

Average a tensor over distributed processes.

Example:

>>> # In process 0
>>> x = torch.tensor(1.0)
>>> # In process 1
>>> x = torch.tensor(2.0)
:rtype: :py:class:`~torch.Tensor`
>>> dist_avg(x)
tensor(1.5)
omnisafe.utils.distributed.dist_sum(value)[source]#

Sum a tensor over distributed processes.

Return type:

Tensor

Example

>>> # In process 0
>>> x = torch.tensor(1.0)
>>> # In process 1
>>> x = torch.tensor(2.0)
>>> dist_sum(x)
tensor(3.)
omnisafe.utils.distributed.dist_max(value)[source]#

Determine global maximum of tensor over distributed processes.

Return type:

Tensor

Example

>>> # In process 0
>>> x = torch.tensor(1.0)
>>> # In process 1
>>> x = torch.tensor(2.0)
>>> dist_max(x)
tensor(2.)
omnisafe.utils.distributed.dist_min(value)[source]#

Determine global minimum of tensor over distributed processes.

Return type:

Tensor

Example

>>> # In process 0
>>> x = torch.tensor(1.0)
>>> # In process 1
>>> x = torch.tensor(2.0)
>>> dist_min(x)
tensor(1.)
omnisafe.utils.distributed.dist_op(value, operation)[source]#

Multi-processing operation.

Note

The operation can be ReduceOp.SUM, ReduceOp.MAX, ReduceOp.MIN. corresponding to mpi_sum(), mpi_max(), mpi_min(), respectively.

Parameters:
  • value (torch.Tensor) – value to be operated.

  • operation (ReduceOp) – operation type.

Return type:

Tensor

omnisafe.utils.distributed.dist_statistics_scalar(value, with_min_and_max=False)[source]#

Get mean/std and optional min/max of scalar x across MPI processes.

Example

>>> # In process 0
>>> x = torch.tensor(1.0)
>>> # In process 1
>>> x = torch.tensor(2.0)
>>> dist_statistics_scalar(x)
(tensor(1.5), tensor(0.5))
Parameters:
  • value (torch.Tensor) – value to be operated.

  • with_min_and_max (bool) – whether to return min and max.

Return type:

Tuple[Tensor, ...]