PyTorch
PyTorch c10d
PyTorch 的 c10d
通信库是用于分布式计算的核心组件,特别是在需要跨多个设备(如多个GPU或多台机器)进行并行计算时。c10d
提供了分布式数据并行(Distributed Data Parallel, DDP)的底层实现,支持高效的数据同步和通信操作。
主要功能
通信后端:
c10d
支持多种通信后端(Backend),如:NCCL
: 适用于GPU间通信,特别是在NVIDIA硬件上。GLOO
: 适用于CPU和GPU的跨平台通信。MPI
: 基于Message Passing Interface,适合大规模分布式系统。
进程组(Process Group):
c10d
中的进程组是通信的基本单元,可以将多个进程组织成一个组,以便它们之间进行通信。可以在不同的进程组之间进行广播、归约、全归约等操作。广播和同步:
c10d
提供了对数据进行广播和同步的接口,确保在多个进程或设备之间一致地传递数据。例如,在多GPU训练中,同步不同GPU上的模型参数。梯度同步: 在分布式数据并行训练中,
c10d
自动同步各个设备计算出的梯度,从而确保在所有设备上更新后的模型参数保持一致。AllReduce操作:
c10d
支持AllReduce操作,这是在分布式训练中非常常用的操作,能够高效地合并不同设备上的梯度并更新模型参数。
使用场景
- 分布式训练:
c10d
被广泛应用于分布式深度学习训练中,特别是在需要处理大规模数据集或模型时,能够显著缩短训练时间。 - 多GPU训练:通过
c10d
,用户可以方便地将训练任务分布在多个GPU上,从而充分利用硬件资源。
示例代码
以下是一个简单的使用 c10d
进行广播操作的示例:
import torch
import torch.distributed as dist
# 初始化通信库,选择GLOO后端
dist.init_process_group(backend='gloo')
# 获取当前进程的rank
rank = dist.get_rank()
# 创建一个张量
tensor = torch.zeros(1)
if rank == 0:
tensor += 1 # 只有rank为0的进程修改张量
# 进行广播操作,将rank 0的张量值广播到所有进程
dist.broadcast(tensor, src=0)
print(f"Rank {rank} has tensor {tensor[0]}")
总结
c10d
是 PyTorch 中实现高效分布式计算的关键组件,通过提供灵活的通信后端和强大的同步机制,极大地简化了在多设备或多机器环境中进行并行计算的复杂性。
SPMD 模型
SPMD(Single Program, Multiple Data)模型是一种并行计算模型,在这种模型中,多个处理单元同时执行相同的程序代码,但每个处理单元处理不同的数据。这种模型广泛应用于高性能计算和分布式计算领域,特别适合需要大规模并行处理的任务。
SPMD 模型的核心思想
单一程序: 在 SPMD 模型中,所有处理单元(通常是不同的CPU或GPU)执行相同的程序代码。这意味着每个处理单元的代码路径是相同的,但可以根据处理单元的ID或索引来执行不同的操作。
多数据流: 尽管所有处理单元执行相同的程序代码,但每个处理单元处理的输入数据是不同的。这通常通过划分数据集来实现,每个处理单元处理其特定的数据子集。
并行性: SPMD 模型通过同时在多个处理单元上执行相同的程序代码来实现并行性。这种并行性可以大幅提高计算效率,特别是在处理大规模数据集时。
通信与同步: 在 SPMD 模型中,处理单元通常需要在某些阶段进行通信和同步。例如,处理单元可能需要共享部分计算结果或交换数据。为了确保数据的一致性,通常需要同步操作来协调各个处理单元的执行。
SPMD 与其他并行模型的比较
SIMD(Single Instruction, Multiple Data):与 SPMD 不同,SIMD 是在所有处理单元上执行相同的指令,并且每个处理单元处理不同的数据。在 SIMD 模型中,处理单元的操作是完全同步的。
MIMD(Multiple Instruction, Multiple Data):MIMD 模型允许每个处理单元执行不同的程序代码并处理不同的数据,与 SPMD 相比,MIMD 提供了更大的灵活性,但通常也更复杂。
应用场景
SPMD 模型在许多并行计算任务中得到了广泛应用,典型的应用场景包括:
分布式深度学习:在深度学习中,SPMD 模型通常用于分布式训练,每个处理单元(如 GPU)处理一部分数据集,并在训练过程中共享模型参数。
科学计算:许多科学计算任务涉及大规模矩阵或向量操作,可以通过 SPMD 模型并行化这些操作,从而显著提高计算速度。
数值模拟:SPMD 模型在气候模拟、物理仿真等领域中也得到广泛应用,允许并行处理不同区域或不同时间步的计算。
示例
在分布式训练中,使用 SPMD 模型的一个简单示例可能是,每个 GPU 处理一个 mini-batch 的数据,然后通过通信操作将所有 GPU 的梯度进行合并并同步模型参数。
总结
SPMD 模型是一种高效的并行计算模型,通过让多个处理单元执行相同的程序代码并处理不同的数据,能够大幅提高计算效率。它的简洁性和高效性使其成为许多并行和分布式计算任务的首选模型。
MPMD
MPMD(Multiple Program, Multiple Data)是一种并行计算模型,与 SPMD(Single Program, Multiple Data)模型相对应。在 MPMD 模型中,多个处理单元(如 CPU 核心或计算节点)可以执行不同的程序,并处理不同的数据集。这种模型适用于更复杂的并行计算任务,特别是在各个处理单元需要执行不同类型的计算时。
MPMD 模型的核心思想
多个程序: 与 SPMD 模型不同,在 MPMD 模型中,每个处理单元可以运行不同的程序代码。这种灵活性允许处理单元根据任务需求执行不同的操作或算法。
多数据流: 与多个程序相对应,每个处理单元处理不同的数据集。数据的划分方式可以根据具体应用进行调整,以实现负载均衡和计算资源的优化利用。
并行性: MPMD 模型通过允许不同的处理单元并行执行不同的任务,最大化了计算资源的利用率。每个处理单元都可以独立处理自己的任务,互不干扰。
通信与同步: 尽管处理单元运行不同的程序,但它们仍可能需要在某些阶段进行通信和数据交换。MPI(Message Passing Interface)等并行计算框架通常支持 MPMD 模型,通过消息传递机制实现进程间的通信与同步。
MPMD 与 SPMD 的比较
灵活性:MPMD 提供了更大的灵活性,因为每个处理单元可以执行不同的程序。相比之下,SPMD 模型中所有处理单元执行相同的程序,只是数据不同。
复杂性:由于 MPMD 允许不同的程序同时运行,因此任务协调和进程管理可能更加复杂。处理单元之间的通信需求可能更高,且需要精心设计。
应用场景:MPMD 更适合那些需要同时执行多种任务的复杂应用。例如,某些科学计算或仿真任务可能需要不同的计算步骤,这些步骤可以由不同的程序在不同的处理单元上并行执行。
应用场景
多物理场仿真:在一些复杂的物理仿真中,可能涉及多个物理场(如流体动力学和热传导)的计算。这些场的计算方法不同,可以在不同的处理单元上并行运行各自的程序。
复杂工作流处理:在需要并行处理不同任务的工作流中,如在大规模数据处理或分析任务中,不同的处理单元可以负责不同的任务或数据集,从而加快整个工作流的执行。
异构计算:在涉及多种计算架构(如 CPU 和 GPU)的应用中,MPMD 模型可以将不同的程序分配给不同的架构,以充分利用各自的计算优势。
示例场景
假设有一个科学模拟项目,需要同时计算两个独立的物理过程。第一个进程在 CPU 上运行大气模拟程序,而第二个进程在 GPU 上运行海洋模拟程序。这两部分模拟数据可能会在某些阶段相互依赖,因此需要通过通信机制进行数据交换。使用 MPMD 模型可以分别启动两个不同的程序,并通过 MPI 等工具进行协调和通信。
实现 MPMD 的框架
MPI 是最常用的实现 MPMD 模型的框架之一。MPI 允许用户在同一个应用程序中启动多个不同的程序实例,每个实例可以有自己的任务和数据,同时提供通信接口以在它们之间进行消息传递。
总结
MPMD(Multiple Program, Multiple Data)模型是一种强大的并行计算模型,允许多个处理单元并行执行不同的程序,并处理不同的数据集。它的灵活性使其非常适合处理复杂的并行任务,特别是在不同任务需要不同计算资源或算法的情况下。MPI 等并行计算框架为 MPMD 模型提供了实现途径,广泛应用于科学计算、数据分析和异构计算等领域。
MPI
MPI(Message Passing Interface)是一种用于并行计算的标准接口,广泛应用于分布式计算和高性能计算(HPC)领域。MPI 提供了一组标准的 API,允许程序在多个处理单元(如 CPU 核心或计算节点)之间进行消息传递和协调,从而实现大规模并行计算。
MPI 的核心概念
进程模型: MPI 采用进程并行模型,即每个并行任务在一个独立的进程中运行。进程之间通过消息传递进行通信,而不是共享内存。这种设计使得 MPI 非常适合在分布式内存系统(如计算集群)上运行。
通信机制:
- 点对点通信:两个进程之间直接进行消息传递,典型的函数有
MPI_Send
和MPI_Recv
,分别用于发送和接收消息。 - 集体通信:MPI 提供了多种集体通信操作,如广播(broadcast)、归约(reduction)、散播(scatter)和聚集(gather),这些操作涉及多个进程之间的通信。
- 点对点通信:两个进程之间直接进行消息传递,典型的函数有
进程组和通信域: MPI 中的进程可以组织成进程组,每个进程组拥有一个通信域(communicator)。
MPI_COMM_WORLD
是所有 MPI 程序默认的通信域,包含了所有进程。用户也可以定义自己的进程组和通信域,以便更灵活地管理和组织进程。同步与异步通信: MPI 支持同步和异步的消息传递模式。同步通信要求发送方等待接收方确认消息接收,确保消息传递的顺序和一致性;异步通信则允许发送方在消息发送后立即继续执行,而不必等待接收方确认。
并行I/O: MPI-IO 是 MPI 的一个子集,专门用于并行文件输入输出操作。MPI-IO 允许多个进程同时读写大规模数据集,为并行计算中的数据管理提供了高效的解决方案。
MPI 的主要功能
并行计算:MPI 允许程序在多个处理单元上并行运行,从而加速计算任务的执行。适用于数值模拟、数据处理等需要处理大量计算的场景。
跨平台兼容:MPI 是一种标准接口,支持多种硬件和操作系统平台。常见的 MPI 实现包括 OpenMPI、MPICH 等,它们在不同的计算环境中均能有效运行。
高效的消息传递:MPI 针对不同的通信模式进行了优化,能够在高性能计算环境中提供低延迟和高带宽的通信服务。
使用场景
数值模拟:在流体力学、气象预报、天体物理等领域,MPI 常用于实现复杂的数值模拟,这些任务通常需要处理非常大的数据集和复杂的计算过程。
数据分析:在大规模数据分析任务中,MPI 可以将计算任务分布到多个节点,从而提高数据处理的速度和效率。
机器学习和深度学习:MPI 也可以用于分布式机器学习,尤其是在需要跨多个节点或多个 GPU 进行训练的情况下,通过 MPI 实现模型的并行训练和参数同步。
示例代码
以下是一个简单的 MPI 程序示例,展示了如何使用 MPI 实现基本的点对点通信:
#include <mpi.h>
#include <stdio.h>
int main(int argc, char** argv) {
MPI_Init(&argc, &argv);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
if (world_rank == 0) {
int data = 100;
MPI_Send(&data, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);
printf("Process 0 sent data %d to process 1\n", data);
} else if (world_rank == 1) {
int data;
MPI_Recv(&data, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
printf("Process 1 received data %d from process 0\n", data);
}
MPI_Finalize();
return 0;
}
总结
MPI 是并行计算领域的重要工具,提供了灵活且高效的消息传递机制,使得程序能够在多处理单元环境中高效运行。MPI 的标准化设计和广泛支持使其成为高性能计算、分布式计算和大规模数据处理任务中的主流选择。
源码解读
init_process_group
torch\distributed\distributed_c10d.py
@_exception_logger
@_time_logger
def init_process_group(
backend: Optional[str] = None,
init_method: Optional[str] = None,
timeout: Optional[timedelta] = None,
world_size: int = -1,
rank: int = -1,
store: Optional[Store] = None,
group_name: str = "",
pg_options: Optional[Any] = None,
device_id: Optional[torch.device] = None,
) -> None:
以下是函数 init_process_group
的翻译:
初始化默认的分布式进程组。
这也将初始化分布式包。
初始化进程组有两种主要方式:
- 明确指定
store
、rank
和world_size
。 - 指定
init_method
(一个 URL 字符串),用于指示在哪里/如何发现同伴。可以选择性地指定rank
和world_size
,或在 URL 中编码所有必需参数并省略它们。
如果两者都未指定,默认 init_method
为 "env://"。
参数说明:
backend
(字符串或 Backend,可选):指定使用的后端。根据构建时的配置,有效值包括mpi
、gloo
、nccl
和ucc
。如果未提供后端,将创建gloo
和nccl
两个后端。请注意,如果使用多个进程并且使用nccl
后端,每个进程必须独占其使用的每个 GPU,否则可能会导致死锁。ucc
后端为实验性特性。init_method
(字符串,可选):指定如何初始化进程组的 URL。默认值为 "env://"(如果未指定init_method
或store
)。与store
互斥。world_size
(整数,可选):参与任务的进程数量。如果指定了store
,这是必需的。rank
(整数,可选):当前进程的编号(应为 0 到world_size
-1 之间的数字)。如果指定了store
,这是必需的。store
(Store,可选):可供所有工作者访问的键值存储,用于交换连接/地址信息。与init_method
互斥。timeout
(timedelta,可选):在进程组上执行操作的超时时间。NCCL 后端的默认值为 10 分钟,其他后端为 30 分钟。这是异步中止集合操作的时间限制,操作失败后进程将崩溃。由于 CUDA 的异步执行,无法安全继续执行用户代码,可能导致后续的 CUDA 操作在损坏的数据上运行。当TORCH_NCCL_BLOCKING_WAIT
被设置时,进程将阻塞并等待此超时时间。group_name
(字符串,可选,已弃用):进程组名称,此参数已被忽略。pg_options
(ProcessGroupOptions,可选):用于构建特定进程组的其他选项。目前支持的选项是ProcessGroupNCCL.Options
,如is_high_priority_stream
可指定 NCCL 后端使用高优先级的 CUDA 流。其他配置选项请参考 NVIDIA NCCL 文档。device_id
(torch.device,可选):用于“绑定”当前进程的特定设备,允许进行后端优化。目前在 NCCL 下有两个效果:1) 通信器立即初始化,而不是通常的延迟调用;2) 当可能时,子组将使用ncclCommSplit
以避免不必要的组创建开销。如果想提前知道 NCCL 初始化错误,也可以使用该参数。
Backend
class Backend(str):
"""
An enum-like class for backends.
Available backends: GLOO, NCCL, UCC, MPI, and other registered backends.
The values of this class are lowercase strings, e.g., ``"gloo"``. They can
be accessed as attributes, e.g., ``Backend.NCCL``.
This class can be directly called to parse the string, e.g.,
``Backend(backend_str)`` will check if ``backend_str`` is valid, and
return the parsed lowercase string if so. It also accepts uppercase strings,
e.g., ``Backend("GLOO")`` returns ``"gloo"``.
.. note:: The entry ``Backend.UNDEFINED`` is present but only used as
initial value of some fields. Users should neither use it directly
nor assume its existence.
"""
Backend 类
一个类似枚举的类,用于表示后端类型。
可用的后端包括:GLOO
、NCCL
、UCC
、MPI
以及其他已注册的后端。
该类的值是小写字符串,例如 "gloo"
。可以通过属性访问,例如 Backend.NCCL
。
此类还可以直接调用来解析字符串,例如,Backend(backend_str)
将检查 backend_str
是否有效,如果有效,返回解析后的小写字符串。它还接受大写字符串,例如,Backend("GLOO")
将返回 "gloo"
。
注意:
条目 Backend.UNDEFINED
存在,但仅作为某些字段的初始值。用户不应直接使用它,也不应假设它的存在。
这个 backend_capability
字典用于定义不同后端的支持能力。它以后端名称作为键,以支持的设备类型列表作为值。
例如:
GLOO
后端支持cpu
和cuda
。NCCL
后端只支持cuda
。UCC
和MPI
后端都支持cpu
和cuda
。
backend_capability: Dict[str, List[str]] = {
GLOO: ["cpu", "cuda"], # GLOO 后端支持 CPU 和 CUDA
NCCL: ["cuda"], # NCCL 后端只支持 CUDA
UCC: ["cpu", "cuda"], # UCC 后端支持 CPU 和 CUDA
MPI: ["cpu", "cuda"], # MPI 后端支持 CPU 和 CUDA
}
这个字典可以用于在代码中根据所选择的后端来决定可以在哪些设备上运行操作。