# [source code analysis] PyTorch distributed (10) -- the Reducer static architecture of distributeddataparallel

catalogue

- [source code analysis] PyTorch distributed (10) -- the Reducer static architecture of distributeddataparallel
- 0x00 summary
- 0x01 introduction
- 0x02 Reducer definition
- 0x03 Bucket
- 0x03 BucketReplica
- 0x04 query class
- 0x05 cumulative correlation class
- 0xFF reference

## 0x00 summary

Through the above analysis, we have known the basic architecture of DDP and how to initialize it. This article will take a look at the static architecture of its core reducer. Reducer provides the core implementation of gradient synchronization in back propagation.

Other articles in this series are as follows:

Automatic differentiation of deep learning tools (1)

Automatic differentiation of deep learning tools (2)

[Source code analysis] automatic differentiation of deep learning tools (3) -- example interpretation

[Source code analysis] how PyTorch implements forward propagation (1) -- basic class (I)

[Source code analysis] how PyTorch implements forward propagation (2) -- basic classes (Part 2)

[Source code analysis] how PyTorch implements forward propagation (3) -- specific implementation

[Source code analysis] how pytoch implements backward propagation (1) -- call engine

[Source code analysis] how pytoch implements backward propagation (2) -- engine static structure

[Source code analysis] how pytoch implements backward propagation (3) -- engine dynamic logic

[Source code analysis] how PyTorch implements backward propagation (4) -- specific algorithm

[Source code analysis] PyTorch distributed (1) -- history and overview

[Source code analysis] PyTorch distributed (2) -- dataparallel (Part 1)

[Source code analysis] PyTorch distributed (3) -- dataparallel (Part 2)

[Source code analysis] PyTorch distributed (4) -- basic concept of distributed application

[Source code analysis] PyTorch distributed (5) -- overview of distributeddataparallel & how to use

[Source code analysis] PyTorch distributed (6) -- distributeddataparallel -- initialization & store

[Source code analysis] PyTorch distributed (7) -- process group of distributeddataparallel

[Source code analysis] PyTorch distributed (8) -- distributed dataparallel

[Source code analysis] PyTorch distributed (9) -- initialization of distributeddataparallel

## 0x01 introduction

### 1.1 call

The creation code of Reducer is as follows, which is in_ ddp_init_helper.

# Note: reverse list of buckets because we want to approximate the # order in which their gradients are produced, and assume they # are used in the forward pass in the order they are defined. self.reducer = dist.Reducer( parameters, # parameters[0] is a list of tensors list(reversed(bucket_indices)), # Bucket information self.process_group, expect_sparse_gradient, self.bucket_bytes_cap, self.find_unused_parameters, self.gradient_as_bucket_view, param_to_name_mapping, )

The parameters called are as follows. parameters[0] is the parameters of the model on rank 0. You can see that only [0] elements are meaningful. The original [0] itself includes 20 elements:

parameters = {list: 1} 0 = {list: 4} 0 = {Parameter: 10} Parameter containing:\ntensor([[-4.0381e-02, 3.8828e-02, 1 ) 1 = {Parameter: 10} Parameter containing:\ntensor([-0.0438, -0.2033, 0.2771, 0.0721, ) 2 = {Parameter: 5} Parameter containing:\ntensor([[-0.0094, -0.1319, 0.0713, 0.3155, ) 3 = {Parameter: 5} Parameter containing:\ntensor([-0.0008, 0.0582, -0.1245, -0.2538, ) ... 20 = {Parameter: 5} Parameter containing:\ntensor([-0.0008, 0.0582, -0.1245, -0.2538, ) __len__ = {int} 20 __len__ = {int} 1

bucket_ Examples of indices are as follows:

For tensor indexes, we give all tensors an index, which increases from 0 to tensors.size(). If the parameters of the model have a total of 20 tensors, then the tensor index is divided into six buckets from 0 to 19. Among the six buckets, each tensor index is unique and does not repeat.

+-----------------------------------------------------------------------+ | | | <tensor index 0, tensor index 1, tensor index 2, tensor index 3> | | | | | | <tensor index 4, tensor index 5, tensor 6> | | | | | | ...... | | | | | | <tensor index 16, tensor index 17, tensor index 18, tensor index 19> | | | +-----------------------------------------------------------------------+

python code is meaningless. We can only look at C + +.

class Reducer(__pybind11_builtins.pybind11_object): def __init__(self, replicas, *args, **kwargs): """ __init__(self: torch._C._distributed_c10d.Reducer, replicas: List[List[at::Tensor]], bucket_indices: List[List[int]], process_group: c10d::ProcessGroup, expect_sparse_gradients: List[List[bool]] = [], bucket_bytes_cap: int = 26214400, find_unused_parameters: bool = False, gradient_as_bucket_view: bool = False, param_to_name_mapping: Dict[int, str] = {}) -> None """ pass

So we came to torch/lib/c10d/reducer.h and torch/lib/c10d/reducer.cpp.

## 0x02 Reducer definition

Reducer provides the core implementation of gradient synchronization in back propagation. Its definition is quite complex. We even need to remove some unimportant member variables to show:

class Reducer { public: // The constructor takes a list of variables for every model replica. // The bucket assignment for this reducer is specified as a list of // buckets, each of which is specified as a list of indices into the // variables list for **a single replica** (i.e. `variables[0]`). explicit Reducer( std::vector<std::vector<at::Tensor>> replicas, std::vector<std::vector<size_t>> bucket_indices, c10::intrusive_ptr<c10d::ProcessGroup> process_group, std::vector<std::vector<bool>> expect_sparse_gradients, int64_t bucket_bytes_cap, bool find_unused_parameters, bool gradient_as_bucket_view, std::unordered_map<size_t, std::string> paramNames); protected: // Forward declaration. struct Bucket; void push_rebuilt_params(const VariableIndex& index); mutable std::mutex mutex_; const std::vector<std::vector<at::Tensor>> replicas_; const c10::intrusive_ptr<::c10d::ProcessGroup> process_group_; std::vector<std::vector<bool>> expect_sparse_gradients_; std::vector<std::vector<std::shared_ptr<torch::autograd::Node>>> grad_accumulators_; std::unordered_map<torch::autograd::Node*, VariableIndex> gradAccToVariableMap_; std::vector<std::pair<uintptr_t, std::shared_ptr<torch::autograd::Node>>> hooks_; bool expect_autograd_hooks_; bool require_finalize_; size_t next_bucket_; bool has_marked_unused_parameters_; const bool find_unused_parameters_; const bool gradient_as_bucket_view_; std::vector<VariableIndex> unused_parameters_; // If it is not used, it is directly set to ready, and it will not change for a long time after the first iteration // Locally used parameter maps indicating if parameters are used locally // during the current iteration or no_sync session if no_sync is on. One // tensor for each model replica and each tensor is one-dim int32 tensor of // number of parameters. These tensors are marked in autograd_hook to indicate // the corresponding param has been used, and get allreduced in the end of // backward of current iteration or no_sync session for figuring out the // globally unused parameters. // // local_used_maps_: CPU tensors for bookkeeping locally used params // local_used_maps_dev_: dev tensors for reducing globally unused params std::vector<at::Tensor> local_used_maps_; std::vector<at::Tensor> local_used_maps_dev_; // Indicate that reduction is done and D2H copy is done as well. bool local_used_maps_reduced_; using GradCallback = torch::distributed::autograd::DistAutogradContext::GradCallback; // A bucket replica represents [1..N] gradients to be reduced, // with the same dtype, on the same device. // // Batching gradients together before reducing them can result in lower // overhead and/or faster time to completion. Only gradients of the same type // and on the same device can be batched. The tensor that represents the // flattened gradient uses the same type and is placed on the same device. // Buckets are filled as the gradients they hold are computed (triggered by // autograd hooks). Buckets are reduced in a predetermined order that is // identical across processes. struct BucketReplica { // Flattened (1 dimensional) contents of bucket. at::Tensor contents; // Views into contents for each grad. Each view will be created with // layout (sizes + strides) matching the grad's expected layout // ("Gradient Layout Contract" in torch/csrc/autograd/AccumulateGrad.h). // `bucket_views_in[i].copy_(grad)` and // `grad.copy_(bucket_views_out[i])` // provide convenient ways to move grad data in/out of contents. // The reason we keep two states for bucket_views is that if DDP // communication hook was registered, `bucket_views_out` could be // re-initialized with the value of hook's `future_work`. We still need to // keep a separate view reference to replica's original contents for // `bucket_views_in[i].copy_(grad)` call. std::vector<at::Tensor> bucket_views_in; std::vector<at::Tensor> bucket_views_out; // Variables that contribute to this bucket replica. Use refcounted value // here so that we can easily unflatten the bucket contents into the // participating variables after reduction has completed. std::vector<at::Tensor> variables; // Per-variable offset/length into the flat bucket contents tensor and grad // bucket. std::vector<size_t> offsets; std::vector<size_t> lengths; // Per-variable sizes into the grad bucekt. std::vector<c10::IntArrayRef> sizes_vec; // Number of tensors to be added before this bucket is complete. // This is reset to `variables.size()` every iteration. size_t pending; // TODO(@pietern) // Memory copies from gradient tensors into the bucket are potentially // done on different CUDA streams. We record an event for every copy // so that we can synchronize with them prior to kicking off the reduction. // std::vector<at::cuda::CUDAEvent> events; }; // A bucket holds N bucket replicas (1 per model replica). // // If every bucket in this struct is ready, the reduction can be kicked off. // One bucket per replica. Reduction is kicked off when every bucket is ready. // struct Bucket { std::vector<BucketReplica> replicas; // Global indices of participating variables in the bucket std::vector<size_t> variable_indices; // Number of replicas to be marked done before this bucket is ready. size_t pending; // Keep work handle around when this set of buckets is being reduced. c10::intrusive_ptr<c10d::ProcessGroup::Work> work; // Keep future work handle around if DDP comm hook is registered. c10::intrusive_ptr<torch::jit::Future> future_work; // If this bucket should expect a single sparse gradient. // Implies: replicas[i].variables.size() == 1. bool expect_sparse_gradient = false; }; std::vector<Bucket> buckets_; // A variable locator locates a particular variable in the bucket // structure. The `bucket_index` field points to the bucket in the `buckets_` // vector. The `intra_bucket_index` field points to the index of the variable // in any of the vector fields in the bucket replica. struct VariableLocator { // Index into the `buckets_` variable. size_t bucket_index; // Index of parameter in single bucket replica. size_t intra_bucket_index; VariableLocator() = default; VariableLocator(size_t bucket_index_, size_t intra_bucket_index_) { bucket_index = bucket_index_; intra_bucket_index = intra_bucket_index_; } }; // Map the index of a variable to its location in the bucket structure. std::vector<VariableLocator> variable_locators_; // track the number of iterations to synchronize grads in training so far. long num_iterations_; // track the number of buckets that have been ready for // communication calls like allReduce or communication hooks. int num_buckets_ready_; // We collect the relative timestamp of every gradient being ready // when executing autograd. This can be used to derive a timeline of // the point in time buckets were ready, or ideal bucket assignment/ordering. std::vector<std::vector<int64_t>> backward_stats_; int ddp_runtime_logging_sample_rate_ = kDDPRuntimeLoggingSampleRate; bool is_multi_device_module_ = false; // Following variables are to help build dynamic bucket order bool has_rebuilt_bucket_; std::vector<at::Tensor> rebuilt_params_; std::vector<int64_t> rebuilt_param_indices_; const int64_t bucket_bytes_cap_; struct RpcContext { using ContextPtr = torch::distributed::autograd::ContextPtr; // The shared_ptr is to hold the context instance. ContextPtr context_ptr_holder; std::atomic<ContextPtr::element_type*> context_ptr{nullptr}; void set(ContextPtr&& new_context_ptr); }; RpcContext rpc_context_; // A struct containing work handle and tensor for allreduce scheduled in // forward pass, if applicable. struct ForwardPassAllreduceWork { c10::intrusive_ptr<c10d::ProcessGroup::Work> workHandle; at::Tensor resultTensor; // whether we should divide by the initial world_size or the no. of // remaining DDP ranks. bool useStaticWorldSize; }; // Handle for the currently scheduled allreduce in the forward pass, if // applicable. ForwardPassAllreduceWork forwardPassWorkHandle_; // Division factor for reduction of gradients. int divFactor_; bool static_graph_; // Key: VariableIndex, Value: the number of times that a variable's autograd_hook() // should be triggered before marking this variable's grad as ready for communication. // Map will not change after 1st iteration. std::unordered_map<VariableIndex, int, c10::hash<VariableIndex>> numGradHooksTriggeredMap_; // Key: VariableIndex, Value: the number of times that a variable's autograd_hook() // are left to be triggered before marking this variable's grad as ready for communication. // Map will change after 1st iteration to track a grad is ready for communication or not. std::unordered_map<VariableIndex, int, c10::hash<VariableIndex>> numGradHooksTriggeredMapPerIteration_; private: // comm_hook_ is used to access the DDP communication hook if registered. std::unique_ptr<CommHookInterface> comm_hook_; // Current thread local state at::ThreadLocalState thread_local_state_; // Debug level setting. It is parsed once when Reducer is constructed, and // remains the same across a single invocation of DDP training. DistributedDebugLevel ddp_debug_level_; // Mapping of variable index to fully qualified name of model to notify users // about errors when certain parameters do not get gradient. std::unordered_map<size_t, std::string> param_names_; // Per iteration set of parameter indices that have been marked ready. std::unordered_set<size_t> perIterationReadyParams_; // Retrieves parameter names that have not been marked as ready as part of // previous iteration. std::vector<std::string> getUnmarkedParamsForIteration(); // Retrives parameter indices that have not been marked as ready as part of // previous iteration. std::vector<size_t> getUnmarkedParamIndicesForIteration(); // Raises appropriate error if mark_variable_ready is called on the same // variable twice, which is unexpected. void checkAndRaiseMarkedTwiceError(size_t curVariableIndex); friend class Logger; };

The key member variables of Reducer are as follows.

std::vector<std::vector<std::shared_ptr<torch::autograd::Node>>> grad_accumulators_; // The corresponding index stores the corresponding grad_accumulator is the grad corresponding to tensor index_ accumulator std::unordered_map<torch::autograd::Node*, VariableIndex> gradAccToVariableMap_; // Saved grad_ The corresponding relationship between calculator & index makes it convenient to find unused parameters in the autograd graph in the future std::vector<std::pair<uintptr_t, std::shared_ptr<torch::autograd::Node>>> hooks_; std::vector<Bucket> buckets_; const std::vector<std::vector<at::Tensor>> replicas_; // Incoming tensor const c10::intrusive_ptr<::c10d::ProcessGroup> process_group_; // Process group

Next, we analyze these member variables one by one.

## 0x03 Bucket

### 3.1 design

Batching gradients together before prescribing gradients can reduce overhead and / or speed up completion time. However, only gradients of the same type on the same device can be batch processed.

A Bucket is a collection of gradients. Gradients of the same type on a unified device are placed in the same Bucket. In the code, Bucket is the concept of Bucket.

In each backward propagation, the tensors in all parameter gradients are copied into the bucket, and the average gradient is copied back into the bucket after AllReduce. To speed up the copy operation, buckets are always created on the same device as the parameters. If the model spans multiple devices, DDP will consider device correlation to ensure that all parameters in the same bucket are located on the same device. The order of AllReduce also affects the results because it determines how much communication can overlap with the calculation. DDP starts AllReduce in the reverse order of model.parameters().

### 3.2 definitions

#### 3.2.1 bucket replica has several

For better explanation, let's first analyze what BucketReplica is. Let's start with the notes.

First of all, a Bucket has multiple Bucket replicas, and each model corresponds to a Bucket replica.

// A bucket holds N bucket replicas (1 per model replica).

However, only one [0] element is used. Since single process multi device mode is not supported at present, it is assumed that there is only one replica in the bucket.

GradBucket grad_bucket( next_bucket_, tensors[0], // The comments here indicate that SPMD is not supported // Since currently we do not support single-process multiple-device // mode, we can assume only one replica in the bucket. bucket.replicas[0].offsets, bucket.replicas[0].lengths, bucket.replicas[0].sizes_vec); bucket.future_work = comm_hook_->runHook(grad_bucket);

Combined with the above code, SPMD will not be supported in the future. Parameters is the parameter set of the [ToyModel] model list, and parameters[0] is the parameter of the ToyModel.

# The following note indicates that SPMD will not be supported in the future # TODO(wayi@): Remove this field since SPMD is no longer supported, # and also remove all the relevant unnecessary loops. # Module replication within process (single-process multi device) self._module_copies = [self.module] # Build a list such as [ToyModel] # Build parameters for reducer. parameters, expect_sparse_gradient = self._build_params_for_reducer()

Based on the above, we know:

- DDP originally wanted to support SPMD like DP, so this process needs to maintain the parameters of multiple model copies on multiple GPU s, that is, parameters is an array, and each element in the array is the parameter of a model copy.
- parameters are assigned to Reducer.replicas_, And Reducer.replicas_ Used to assign values to bucket.replicas.
- Because Reducer.replicas_ is not supported in the future, So only parameters[0] makes sense.

So we conclude:

- BucketReplica is the gradient parameter group of a model to be solved. The replica corresponds to the parameter information (part) of the model copy on a device (GPU), that is, a replica represents [1..N] gradients that need to be regulated. These gradients have the same dtype and are located on the same device.
- In fact, only bucket.replicas[0] is meaningful, which corresponds to some of the requirements derivative tensors in [self.module] in the above code, that is, parameters[0].

#### 3.2.2 key

Let's summarize the key of Bucket:

#### 3.2.3 specific definitions

Finally, Bucket is defined as follows:

// A bucket holds N bucket replicas (1 per model replica). // // If every bucket in this struct is ready, the reduction can be kicked off. // One bucket per replica. Reduction is kicked off when every bucket is ready. // struct Bucket { std::vector<BucketReplica> replicas;// Each model copy corresponds to a bucket // Global indices of participating variables in the bucket std::vector<size_t> variable_indices; // What are the variable s in each bucket. // Number of replicas to be marked done before this bucket is ready. size_t pending; // Count, // Keep work handle around when this set of buckets is being reduced. c10::intrusive_ptr<c10d::ProcessGroup::Work> work; // Keep future work handle around if DDP comm hook is registered. c10::intrusive_ptr<torch::jit::Future> future_work; // If this bucket should expect a single sparse gradient. // Implies: replicas[i].variables.size() == 1. bool expect_sparse_gradient = false; };

### 3.3 setting

Member variable buckets of Reducer_ This is the key. This is all the buckets in the Reducer.

std::vector<Bucket> buckets_;

In the initialization function, there is how to initialize buckets), The core is:

- Find this bucket_ index in indexes.
- Find the tensor corresponding to index in parameters.
- Configuring these tensors in BucketReplica is the tensor that this bucket should regulate.

void Reducer::initialize_buckets( std::vector<std::vector<size_t>> bucket_indices) { buckets_.reserve(bucket_count); for (size_t bucket_index = 0; bucket_index < bucket_count; bucket_index++) { Bucket bucket; // Variables that expect sparse gradients must have their own bucket. if (bucket_indices[bucket_index].size() == 1) { const auto variable_index = bucket_indices[bucket_index].front(); bucket.expect_sparse_gradient = // Set bucket expect_sparse_gradients_[0][variable_index]; } // Iterate over model replicas. for (size_t replica_index = 0; replica_index < replica_count; replica_index++) { BucketReplica replica; // Set up replica if (bucket.expect_sparse_gradient) { const auto variable_index = bucket_indices[bucket_index].front(); // Find the tensor corresponding to the index const auto& variable = replicas_[replica_index][variable_index]; replica.variables = {variable}; } else { // Iterate over bucket variables. for (const auto variable_index : bucket_indices[bucket_index]) { // Find the tensor corresponding to the index const auto& variable = replicas_[replica_index][variable_index]; if (!options.has_device()) { options = options.device(variable.device()); } if (!options.has_dtype()) { options = options.dtype(variable.dtype()); } const auto length = variable.numel(); replica.variables.push_back(variable); // Insertion tensor replica.offsets.push_back(offset); replica.lengths.push_back(length); replica.sizes_vec.push_back(variable.sizes()); offset += length; } // Allocate bucket contents tensor. initialize_bucket_views(replica, replica.contents); } // Add bucket replica to enclosing bucket. bucket.replicas.push_back(std::move(replica)); // Configure bucket } bucket.variable_indices = std::move(bucket_indices[bucket_index]); buckets_.push_back(std::move(bucket)); //Insert bucket list } }

As shown in the figure below, it is assumed that bucket index is 1, that is, the second bucket, so variable_indices corresponds to a bucket_ The corresponding section in indices. For example, in BucketReplica[0], Tensor 4,5,6, and variable_ Indexes are indexes of tensor 4, 5 and 6 respectively.

Bucket in the following figure_ Indexes is one of the parameters of the Reducer constructor.

+--------------------------------+ +------------------------------------+ |Reducer | | | | | |bucket 0, bucket 1, ...... bucket n | | vector<Bucket> buckets_ +---> | + | | | | | | +--------------------------------+ +------------------------------------+ | +---------------+ +------------------------------+ | +--> | Tensor 4, Tensor 5, Tensor 6 | | | +------------------------------+ | | v +-----------------------------------------+ +-------------------------+-----------+ | | | | Bucket | | +---+-----------+ +---------------+ | | | | | BucketReplica | | BucketReplica | | | | | | | ... | | | | vector<BucketReplica> replicas +--------> | +---------------+ +---------------+ | | | +-----------------------------------------+ | | | vector<size_t> variable_indices +-------> <tensor index 4, tensor index 5, tensor 6> | | +-------------------------------------+ bucket_indices +-----------------------------------------------------------------------+ + | | | | <tensor index 0, tensor index 1, tensor index 2, tensor index 3> | | | | +----------> | | | <tensor index 4, tensor index 5, tensor 6> | | | | | | ...... | | | | | | <tensor index 16, tensor index 17, tensor index 18, tensor index 19> | | | +-----------------------------------------------------------------------+

## 0x03 BucketReplica

As discussed earlier, a bucket replica represents [1..N] gradients that need to be regulated. These gradients have the same dtype and are located on the same device. It is a part of the gradient parameters to be calculated for a model. The specific parameters are determined by the bucket variable_indices decision.

Its key member variables are:

- STD:: vector < at:: tensor > variables are the variables that make up this bucket copy. We use refcounted value here, so that we can easily unflatten the bucket content into the participating variable after completing the specification.
- at::Tensor contents: the result of flattening the contents of the bucket, that is, the result after Flattened (1 dimensional).
- std::vector<at::Tensor> bucket_ views_ In: provides a method to view specific gradients in contents from the perspective of input.
- std::vector<at::Tensor> bucket_ views_ Out: provides a method to view specific gradients in contents from the perspective of output.

See the following notes for details:

Views serve as entry points to copy_ each grad's data in/out of the flat contents tensor.

### 3.1 Views

About STD:: vector < at:: tensor > bucket_ views_ In and STD:: vector < at:: tensor > bucket_ views_ Further description of out:

- In PyTorch, view refers to creating something convenient to view. The view shares memory with the original data. It just arranges the original data, directly displays some of its contents, or displays it after reordering.
- Each view will be created according to the following layout (sizes + stripes), which matches the expected layout of grad.
- bucket_views_in and bucket_views_out these two variables provide methods to manipulate specific gradients in contents, or they provide views that can manipulate the gradients of each tensor in contents. Users use these two variables as entry points to move the data of each gradient in and out of the content.
- We call it bucket_ The reason why the view remains in two states is that if a DDP communication hook is registered, the bucket_views_out can use the future of the hook_ The work value is reinitialized. So we need to create a bucket_views_in[i].copy_(grad) keep a separate view reference to the original replica contents.
- bucket_views_in[i].copy_(grad) and grad.copy_(bucket_views_out[i]) provides a convenient method to move gradient data into / out of contents.

In addition, the following three member variables store each flat tensor information of the bucket. For example, offsets stores the offset of each tensor in the flat bucket contents.

// Per-variable offset/length into the flat bucket contents tensor and grad // bucket. std::vector<size_t> offsets; std::vector<size_t> lengths; // Per-variable sizes into the grad bucekt. std::vector<c10::IntArrayRef> sizes_vec;

### 3.2 definitions

BucketReplica is specifically defined as:

// A bucket replica represents [1..N] gradients to be reduced, // with the same dtype, on the same device. // // Batching gradients together before reducing them can result in lower // overhead and/or faster time to completion. Only gradients of the same type // and on the same device can be batched. The tensor that represents the // flattened gradient uses the same type and is placed on the same device. // Buckets are filled as the gradients they hold are computed (triggered by // autograd hooks). Buckets are reduced in a predetermined order that is // identical across processes. struct BucketReplica { // Flattened (1 dimensional) contents of bucket. at::Tensor contents; // It's even here // Views into contents for each grad. Each view will be created with // layout (sizes + strides) matching the grad's expected layout // ("Gradient Layout Contract" in torch/csrc/autograd/AccumulateGrad.h). // `bucket_views_in[i].copy_(grad)` and // `grad.copy_(bucket_views_out[i])` // provide convenient ways to move grad data in/out of contents. // The reason we keep two states for bucket_views is that if DDP // communication hook was registered, `bucket_views_out` could be // re-initialized with the value of hook's `future_work`. We still need to // keep a separate view reference to replica's original contents for // `bucket_views_in[i].copy_(grad)` call. std::vector<at::Tensor> bucket_views_in; // How to find from contents std::vector<at::Tensor> bucket_views_out; // An output view // Variables that contribute to this bucket replica. Use refcounted value // here so that we can easily unflatten the bucket contents into the // participating variables after reduction has completed. std::vector<at::Tensor> variables; // Per-variable offset/length into the flat bucket contents tensor and grad // bucket. std::vector<size_t> offsets; std::vector<size_t> lengths; // Per-variable sizes into the grad bucekt. std::vector<c10::IntArrayRef> sizes_vec; // Number of tensors to be added before this bucket is complete. // This is reset to `variables.size()` every iteration. size_t pending; // TODO(@pietern) // Memory copies from gradient tensors into the bucket are potentially // done on different CUDA streams. We record an event for every copy // so that we can synchronize with them prior to kicking off the reduction. // std::vector<at::cuda::CUDAEvent> events; };

So far, the logic is as follows. As mentioned earlier, only replica [0] is meaningful for each bucket.

+-----------------------------------------------------+ +----------------------------+ | +-------+ +----------------------------------+ | | Reducer | | |Bucket | |Bucket | | | | | | | | | | | | | | | | Future future_work | | | vector<Bucket> buckets_ +------> | | | ... | | | | | | | | | ProcessGroup::Work work | | | | | | | | | | | | | | | | vector<size_t> variable_indices | | | | | | | | | | | | | | | | vector<BucketReplica> replicas | | | | | | | | + | | | | | | | | | | | | | | | | | | | | +----------------------------+ | +-------+ +----------------------------------+ | +-----------------------------------------------------+ | | v +--------------------------------------------------------------+ | +---------------+ +----------------------------------+ | | |BucketReplica | | BucketReplica | | | | | | | | | | | | | | | | | | vector<Tensor> bucket_views_in | | | | | ... | | | | | | | vector<Tensor> bucket_views_out | | | | | | | | | | | | Tensor contents | | | | | | | | | | | | vector<Tensor> variables | | | | | | | | | | | | | | | +---------------+ +----------------------------------+ | +--------------------------------------------------------------+

### 3.3 initialization

Part of the initialization code is in Reducer::initialize_buckets.

// Allocate bucket contents tensor replica.contents = at::empty({static_cast<long>(offset)}, options); initialize_bucket_views(replica, replica.contents);

initialize_ bucket_ The specific code of views is as follows. Several PyTorch functions need to be described here.

- as_ Striped: create a view based on the existing tensor and the given step size (the type is still tensor), share memory with the original data, and do not store poetry, so neither view is real storage, but only view.
- narrow: returns a new tensor, which is a reduced version of the original tensor.

initialize_ bucket_ The main logic of views is:

- Traverse the tensor of replica. For each tensor, conduct different processing according to whether it is deny or spark, and finally insert it into replica.bucket_views_in.
- Copy. Bucket_ views_ Out is set to replica.bucket_views_in, normal should be equal.
- If gradient_as_bucket_view_ If it is set to true, two situations need to be handled:
- When rebuild is called_ Buckets when rebuilding a bucket, initialize_bucket_view can be in initialize_ Call in bucket. If grad has been defined / calculated in the last iteration, you need to copy the old grad to the new bucket_view and point grad to the new bucket_view.
- initialize_bucket_ The view can also be initialized during construction_ Called in bucket. Grad will not be defined during build time, In this case, do not let the gradient point to the bucket_view, because for parameters not used globally, the gradient should remain undefined.

The specific codes are as follows:

// (see Note: "Gradient Layout Contract" in initialize_buckets). void Reducer::initialize_bucket_views( Reducer::BucketReplica& replica, at::Tensor& contents) { for (size_t i = 0; i < replica.variables.size(); i++) { // Tensor of ergodic replica auto& v = replica.variables[i]; const auto offset = replica.offsets[i]; const auto length = replica.lengths[i]; if (v.is_non_overlapping_and_dense()) { // If the param's memory is dense, match its layout, anticipating // the autograd engine (AccumulateGrad) will also create gradients // matching its layout. replica.bucket_views_in.push_back( // Deny type contents.as_strided(v.sizes(), v.strides(), offset)); } else { // Fall back to a C-style contiguous view, again anticipating // AccumulateGrad will do the same when stashing grads for non-dense // params. replica.bucket_views_in.push_back( // Spark type contents.narrow(0, offset, length).view(v.sizes())); } // By default `bucket_views_out` and `bucket_views_in` are // essentially the same thing. replica.bucket_views_out = replica.bucket_views_in; // If gradient_as_bucket_view_ is set as true, then there are two cases to // handle: initialize_bucket_views could be called inside initialize_buckets // when rebuild_buckets, if grad has already been defined/calculated in // previous iteration, old grad needs to be copied into new bucket_view and // let grad point to the new bucket_view, initialize_bucket_views could also // be called inside initialize_buckets during construction. Grads are not // defined during construction time, in this case, do not let grad point to // bucket_view, because grads should be kept as being undefined for globally // unused parameters. if (gradient_as_bucket_view_) { auto& bucket_view = replica.bucket_views_in.back(); runGradCallbackForVariable(v, [&](auto& grad) { if (grad.defined() && !grad.is_alias_of(bucket_view)) { bucket_view.copy_(grad); grad = bucket_view; // The gradient has been modified and needs to be written back // The grad is modefied and needs to be written back. return true; } // The gradient has not been modified and does not need to be written back // The grad is not modified and does not need to be written back. return false; }); } } }

See the following figure for details:

+------------------------------------------+ | BucketReplica | | | | vector<Tensor> bucket_views_in +--------------------+ | | | | | | | vector<Tensor> bucket_views_out +--------------+ | | | | | | | | | | | v v | | +-----+----+--------------------------+ | Tensor contents +---------------------> |Flattened (Tensor1, Tensor2, Tensor3)| | | +-------------------------------------+ | | | | | vector<Tensor> variables +------------> [Tensor1,Tensor2,Tensor3] | | | | | | +------------------------------------------+

In addition, mark_variable_ready_sparse, mark_variable_ready_dense， finalize_backward has an assignment to contents.

## 0x04 query class

The following two classes are used to let the autograd hook function determine the bucket corresponding to the tensor.

### 4.1 VariableIndex

VariableIndex is to determine the position of a tensor in a bucket. This is useful for autograd hook. For the autograd hook callback, the process where the callback function is located only knows its own gradient tensor, but the callback function needs to know which replica this tensor is located in and where it is located in the replica, so as to further specify.

#### 4.1.1 member variables

Among instances of classes such as Reducer, there is only one member variable of VariableIndex, and the independent member variable is:

std::vector<VariableIndex> unused_parameters_

VariableIndex exists as part of other member variables or parameters, such as gradAccToVariableMap in Reducer_ Is the use of VaribaleIndex.

std::unordered_map<torch::autograd::Node*, VariableIndex> gradAccToVariableMap_; // Saved grad_ The corresponding relationship between calculator & index makes it convenient to find unused parameters in the autograd graph in the future

#### 4.1.2 definitions

VariableIndex is defined as follows:

// Locates a specific variable by replica index and variable index. struct VariableIndex { size_t replica_index; // In which replica size_t variable_index; // variable index. Note that it is not "where in the replica", but the index of all variable. For example, there are 10 parameters, variable_ The value of index is from 0 to 9. So what determines "where in the replica"? Determined by the VariableLocator below. VariableIndex() = default; VariableIndex(size_t replica_index_, size_t variable_index_) { replica_index = replica_index_; variable_index = variable_index_; } static size_t hash(const VariableIndex& key) { return c10::get_hash(key.replica_index, key.variable_index); } };

In the constructor of Reducer, the following code is used for autogrid_hook setting, which sets a hook for each tensor on each replica. If autograd hook does not know which bucket this gradient corresponds to, it cannot tell DDP that the bucket is ready as a whole.

How do I find the bucket? You need to use the following VariableLocator.

auto& variable = replicas_[replica_index][variable_index]; const auto index = VariableIndex(replica_index, variable_index); // A VariableIndex was generated hooks_.emplace_back( grad_accumulator->add_post_hook( torch::make_unique<torch::autograd::utils::LambdaPostHook>( [=](const torch::autograd::variable_list& outputs, const torch::autograd::variable_list& /* unused */) { #ifndef _WIN32 this->rpc_context_.set( ThreadLocalDistAutogradContext::getContextPtr()); #endif this->autograd_hook(index); // The parameter of hook is VariableIndex, so that hook can find tensor smoothly return outputs; })), grad_accumulator);

### 4.2 VariableLocator

#### 4.2.1 definitions

The VariableLocator is used to determine a variable in the bucket. In order to find a tensor position, we need to know which bucket is in and which position in the tensor of the bucket.

- Which bucket: bucket_index is Reducer.buckets_ The location of the list, indicating buckets_ A bucket above.
- Where is the bucket copy: intra_bucket_index is the variable index of the vector field in bucket.replica.

// A variable locator locates a particular variable in the bucket // structure. The `bucket_index` field points to the bucket in the `buckets_` // vector. The `intra_bucket_index` field points to the index of the variable // in any of the vector fields in the bucket replica. struct VariableLocator { // Index into the `buckets_` variable. size_t bucket_index; // Which bucket // Index of parameter in single bucket replica. size_t intra_bucket_index; // Where is the bucket copy VariableLocator() = default; VariableLocator(size_t bucket_index_, size_t intra_bucket_index_) { bucket_index = bucket_index_; intra_bucket_index = intra_bucket_index_; } };

#### 4.2.2 member variables

The member variables of Reducer are:

// Map the index of a variable to its location in the bucket structure. std::vector<VariableLocator> variable_locators_;

##### 4.2.2.1 initialization

How to initialize?

void Reducer::initialize_buckets( std::vector<std::vector<size_t>> bucket_indices) { // Clear current bucket assignment. buckets_.clear(); variable_locators_.clear(); // Ensure we have a bucket index for every variable. variable_locators_.resize(replicas_[0].size()); // Iterate over buckets. const auto bucket_count = bucket_indices.size(); const auto replica_count = replicas_.size(); buckets_.reserve(bucket_count); for (size_t bucket_index = 0; bucket_index < bucket_count; bucket_index++) { // Traversal bucket // Map participating variables to this bucket. // This is identical across replicas so we only need to do this once. size_t intra_bucket_index = 0; for (const auto variable_index : bucket_indices[bucket_index]) { // Traversing the tensor in the bucket, each tensor index in all buckets is unique variable_locators_[variable_index] = VariableLocator(bucket_index, intra_bucket_index++); // intra_bucket_index is incremental } } }

Question: variable_locators_[variable_index] won't it be repeated between different buckets? No, because the variable locator (bucket_index, intra_bucket_index + +) is, by definition, bucket_ Index and intra_ bucket_ The combination of indexes is unique.

We give an example. For tensor indexes, we give all tensors an index, which increases from 0 to tensors.size(). If the parameters of the model have a total of 12 tensors, the tensor index ranges from 0 to 11. If it is divided into six buckets, each tensor index is unique among the six buckets.

+-----------------------------------------------------------------------+ | | | <tensor index 0, tensor index 1, tensor index 2, tensor index 3> | | | | | | <tensor index 4, tensor index 5, tensor 6> | | | | | | ...... | | | | | | <tensor index 16, tensor index 17, tensor index 18, tensor index 19> | | | +-----------------------------------------------------------------------+

In this way, the corresponding variable_locators_ Yes:

variable_locators_[tensor index 0] = VariableLocator(bucket 0, 0)，Namely tensor index 0 belong to bucket 0 The first one variable. variable_locators_[tensor index 1] = VariableLocator(bucket 0, 1)，Namely tensor index 1 belong to bucket 0 The second variable. variable_locators_[tensor index 2] = VariableLocator(bucket 0, 2)，Namely tensor index 2 belong to bucket 0 The third variable. variable_locators_[tensor index 3] = VariableLocator(bucket 0, 3)，Namely tensor index 3 belong to bucket 0 The fourth variable.

##### 4.2.2.2 use

How to use? Let's use the following as an example.

When the autograd hook is called, use the VariableIndex index to call back,

this->autograd_hook(index)

autograd_hook finally calls mark_variable_ready_dense, which in turn passes through variable_locators_ To determine the bucket, and then carry out subsequent operations.

void Reducer::mark_variable_ready_dense(VariableIndex index) { const auto replica_index = index.replica_index; const auto variable_index = index.variable_index; const auto& bucket_index = variable_locators_[variable_index]; // Find the bucket index corresponding to the tensor auto& bucket = buckets_[bucket_index.bucket_index]; // Find the bucket auto& replica = bucket.replicas[replica_index]; // Then find the corresponding replica through the bucket auto& variable = replica.variables[bucket_index.intra_bucket_index]; // Found the tensor const auto offset = replica.offsets[bucket_index.intra_bucket_index]; // Tensor information found const auto length = replica.lengths[bucket_index.intra_bucket_index]; auto& bucket_view = replica.bucket_views_in[bucket_index.intra_bucket_index]; // Then you can continue processing // Copy contents of gradient tensor to bucket tensor. // If the gradient is not set, we assume it wasn't computed // as part of the current backwards pass, and zero the part // of the bucket it would otherwise hold. runGradCallbackForVariable(variable, [&](auto& grad) { if (grad.defined()) { this->check_grad_layout(grad, bucket_view); // When gradient_as_bucket_view_ is false, or even when // gradient_as_bucket_view_ is true, in rare cases users may set grad to // be None after every iteration. In these cases, grad and bucket_view are // pointing to different storages and thus need to copy grads to // bucket_view. If gradient_as_bucket_view_ is set as true, let grad point // to bucket_view. If grad has already been set as views of buckets in // previous iterations, no copy is needed. if (!grad.is_alias_of(bucket_view)) { this->copy_grad_to_bucket(grad, bucket_view); if (gradient_as_bucket_view_) { // Let grad point to bucket_view buffer. grad = bucket_view; // The grad is modified and need to be written back. return true; } } else { // If grad and bucket view point to the same storage, no need to copy if (comm_hook_ == nullptr) { bucket_view.div_(divFactor_); } } } else { bucket_view.zero_(); } // The grad is not modified and doesn't need to be written back. return false; }); }

## 0x05 cumulative correlation class

The following are gradient accumulation related classes.

### 5.1 grad_accumulators_

grad_accumulators_ It can be considered as a matrix. Each item of the matrix is an AccumulateGrad (Node type), which is used to calculate the gradient. At present, this is just a bookkeeping function.

std::vector<std::vector<std::shared_ptr<torch::autograd::Node>>> grad_accumulators_;

As shown in the figure below, variable1 is an actual tensor, grad_accumulators_ An item in points to the AccumulateGrad of variable1.

variable1 +----+ | | v +-----------------------------------+ +-------------+-----------+ |grad_accumulators_ | | Variable | | | | | | | | +------------------+ | | [replica_index][variable_index]+---------->+ AccumulateGrad | | | | | | | | | | | | | | +-----------------------------------+ | | post_hooks_+--------> autograd_hook(index) | | | | | | | | | +------------------+ | | | +-------------------------+

#### 5.1.1 initialization

How to initialize? In the Reducer build function:

{ const auto replica_count = replicas_.size(); // The following two for loops traverse all tensors for (size_t replica_index = 0; replica_index < replica_count; replica_index++) { for (size_t variable_index = 0; variable_index < variable_count; variable_index++) { auto& variable = replicas_[replica_index][variable_index]; const auto index = VariableIndex(replica_index, variable_index); // The gradient accumulator function is lazily initialized once. // Therefore we can use its presence in the autograd graph as // evidence that the parameter has participated in an iteration. auto grad_accumulator = // Get the grad of a tensor_ accumulator torch::autograd::impl::grad_accumulator(variable); // Hook to execute after the gradient accumulator has executed. hooks_.emplace_back( grad_accumulator->add_post_hook( torch::make_unique<torch::autograd::utils::LambdaPostHook>( [=](const torch::autograd::variable_list& outputs, const torch::autograd::variable_list& /* unused */) { #ifndef _WIN32 this->rpc_context_.set( ThreadLocalDistAutogradContext::getContextPtr()); #endif this->autograd_hook(index); return outputs; })), grad_accumulator); // Map raw function pointer to replica index and parameter index. // This is used later on when the autograd graph is traversed // to check for parameters for which no gradient is computed, if // find_unused_parameters=True. // Note that the mapping of gradient accumulator to variable should be // one to one as we deduplicate shared parameters before constructing // Reducer. if (find_unused_parameters_) { gradAccToVariableMap_[grad_accumulator.get()] = index; } numGradHooksTriggeredMap_[index] = 0; // The gradient accumulator is stored as weak_ptr in the autograd // metadata of the variable, so we have to keep it alive here for // the raw pointer to be valid. grad_accumulators_[replica_index][variable_index] = std::move(grad_accumulator); // Put the grad of this tensor_ Copy calculator to grad_accumulators_ } } }

#### 5.1.2 use

grad_ The Node returned by the calculator, that is, AccumulateGrad, is a Node type. We took out the check code.

std::shared_ptr<Node> grad_accumulator(const Variable& self) { auto autograd_meta = get_autograd_meta(self); std::lock_guard<std::mutex> lock(autograd_meta->mutex_); auto result = autograd_meta->grad_accumulator_.lock(); if (result) return result; c10::raw::intrusive_ptr::incref(self.unsafeGetTensorImpl()); auto intrusive_from_this = c10::intrusive_ptr<at::TensorImpl>::reclaim(self.unsafeGetTensorImpl()); result = std::make_shared<AccumulateGrad>(Variable(std::move(intrusive_from_this))); autograd_meta->grad_accumulator_ = result; return result; }

### 5.2 gradAccToVariableMap_

gradAccToVariableMap_ The definition of is as follows:

std::unordered_map<torch::autograd::Node*, VariableIndex> gradAccToVariableMap_;

The function is to give each Node a corresponding VariableIndex, as shown in the figure. Next, give variable 1 an index 1:

+--------------+ | Variable | +---> | | | | | | +--------------+ | | +-------------------------------------+ | | gradAccToVariableMap_ | | | | | | | + | <Node*, VariableIndex> +---------> [variable1 : index1, variable2 : index2] | | + | | | | | | +-------------------------------------+ | | v +---------+-----------------------------+ |VariableIndex | | | | replica_index of Variable1 | | | | variable_index of Variable1 | | | +---------------------------------------+

#### 5.2.1 initialization

How to initialize? In the Reducer constructor, there is a VariableIndex for each variable to be derived.

auto& variable = replicas_[replica_index][variable_index]; const auto index = VariableIndex(replica_index, variable_index); auto grad_accumulator = torch::autograd::impl::grad_accumulator(variable); if (find_unused_parameters_) { gradAccToVariableMap_[grad_accumulator.get()] = index; }

#### 5.2.2 use

gradAccToVariableMap_ The use of search is as follows_ unused_ Parameters is to traverse and find gradacctovariablemap_, If an accumulator function is not in gradacctovariablemap_ Inside, it means that you don't need to calculate the gradient.

// Traverse the autograd graph starting at the specified output. // All parameters for which we have a pointer to their gradient accumulation // functions, but don't show up in the autograd graph will be marked ready for // for reduction as soon as the first autograd hook is called. This is not // done immediately because the model output may be ignored, and we only // want to start performing reductions on `torch.autograd.backward()`. void Reducer::search_unused_parameters( const std::vector<torch::autograd::Variable>& outputs) { std::unordered_set<torch::autograd::Node*> seen; std::vector<torch::autograd::Node*> queue; // Seed queue with the grad functions of all outputs. for (const auto& output : outputs) { const auto& grad_fn = output.grad_fn(); if (grad_fn) { queue.push_back(grad_fn.get()); } } // Traverse the autograd graph starting at the specified output. while (!queue.empty()) { auto fn = queue.back(); queue.pop_back(); for (const auto& edge : fn->next_edges()) { if (auto next_ptr = edge.function.get()) { const bool was_inserted = seen.insert(next_ptr).second; if (was_inserted) { queue.push_back(next_ptr); } } } } // Traverse the search. If an accumulator function is not in the figure, it means that the gradient does not need to be calculated // Find accumulator functions that don't show up in this graph. for (const auto& it : gradAccToVariableMap_) { // If the accumulator function is present in the graph, we know // a gradient will be computed for the corresponding parameter. if (seen.count(it.first) == 0) { unused_parameters_.push_back(it.second); } } }

### 5.3 numGradHooksTriggeredMap_

Record the autograd of this tensor before the gradient of this tensor is ready_ Hook should be called several times. After the first iteration, it will not increase, so this value should be 1 or 0. Used to set unused_parameters_ And configure numgradhookstriggeredmapiteritration_.

// Key: VariableIndex, Value: the number of times that a variable's autograd_hook() // should be triggered before marking this variable's grad as ready for communication. // Map will not change after 1st iteration. std::unordered_map<VariableIndex, int, c10::hash<VariableIndex>> numGradHooksTriggeredMap_;

#### 5.3.1 initialization

How to initialize? In the build function:

numGradHooksTriggeredMap_[index] = 0;

After the first iteration, autogrid is called later_ Hook will increase by one.

// The function `autograd_hook` is called after the gradient for a // model parameter has been accumulated into its gradient tensor. // This function is only to be called from the autograd thread. void Reducer::autograd_hook(VariableIndex index) { // Omit some codes if (static_graph_first_iteration()) { numGradHooksTriggeredMap_[index] += 1; // In the first iteration of the static graph, 1 will be added here return; // Then go straight back, attention! } // If `find_unused_parameters_` is true there may be model parameters that // went unused when computing the model output, they won't be part of the // autograd graph, and won't receive gradients. These parameters are // discovered in the `prepare_for_backward` function and their indexes stored // in the `unused_parameters_` vector. if (!has_marked_unused_parameters_) { has_marked_unused_parameters_ = true; for (const auto& unused_index : unused_parameters_) { mark_variable_ready(unused_index); } } // If it is static graph, after 1st iteration, check a avariable // is ready for communication based on numGradHooksTriggeredMap_. if (static_graph_after_first_iteration()) { if (--numGradHooksTriggeredMapPerIteration_[index] == 0) { // Finally mark variable for which this function was originally called. mark_variable_ready(index); // } } else { // Finally mark variable for which this function was originally called. mark_variable_ready(index); } }

#### 5.3.2 use

How to use? There will be reset here.

void Reducer::reset_bucket_counting() { next_bucket_ = 0; // Reset num_buckets_ready_ at the beginning of backward computation // in each iteration. num_buckets_ready_ = 0; for (auto& bucket : buckets_) { for (auto& replica : bucket.replicas) { replica.pending = replica.variables.size(); } bucket.pending = bucket.replicas.size(); } if (static_graph_) { numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_; } }

It will also be processed here. If 0, insert unused_parameters_.

// Right now delay_all_reduce is only called when static_graph_=true and // num_iterations_==1. void Reducer::delay_all_reduce() { // Omit some codes // copy all gradients to buckets for (size_t replica_index = 0; replica_index < replicas_.size(); replica_index++) { for (size_t variable_index = 0; variable_index < replicas_[replica_index].size(); variable_index++) { const auto index = VariableIndex(replica_index, variable_index); // set unused_parameters_ if (numGradHooksTriggeredMap_[index] == 0) { // If 0, insert unused_parameters_ unused_parameters_.push_back(index); } require_finalize_ = true; set_divide_factor(); if (expect_sparse_gradients_[replica_index][variable_index]) { mark_variable_ready_sparse(index); } else { mark_variable_ready_dense(index); } } } // launch all reduces for all buckets for (auto & bucket : buckets_) { all_reduce_bucket(bucket); } finalize_backward(); }

### 5.4 numGradHooksTriggeredMapPerIteration_

Before the gradient of this tensor is ready, the autograd of this tensor_ Hook needs to be called several more times. If it is 0, it means that the bucket should be ready as a whole.

This member variable uses numGradHooksTriggeredMap_ To reset.

// Key: VariableIndex, Value: the number of times that a variable's autograd_hook() // are left to be triggered before marking this variable's grad as ready for communication. // Map will change after 1st iteration to track a grad is ready for communication or not. std::unordered_map<VariableIndex, int, c10::hash<VariableIndex>> numGradHooksTriggeredMapPerIteration_;

#### 5.4.1 use

How to use? In the case of static graph, if it is not the first iteration (gradient has just been generated at this time), numgradhooks triggeredmappperitration will be used_ [index] decrements. If it is 0, it indicates that the variable is ready for collection operation.

// The function `autograd_hook` is called after the gradient for a // model parameter has been accumulated into its gradient tensor. // This function is only to be called from the autograd thread. void Reducer::autograd_hook(VariableIndex index) { // Omit other codes // If it is static graph, after 1st iteration, check a avariable // is ready for communication based on numGradHooksTriggeredMap_. if (static_graph_after_first_iteration()) { if (--numGradHooksTriggeredMapPerIteration_[index] == 0) { // Finally mark variable for which this function was originally called. mark_variable_ready(index); } } else { // Finally mark variable for which this function was originally called. mark_variable_ready(index); } }

This value will be reset during the new iteration, prepare_for_backward will call reset_bucket_counting.

And it uses numgradhooks triggeredmap_ To reset.

void Reducer::reset_bucket_counting() { next_bucket_ = 0; // Reset num_buckets_ready_ at the beginning of backward computation // in each iteration. num_buckets_ready_ = 0; for (auto& bucket : buckets_) { for (auto& replica : bucket.replicas) { replica.pending = replica.variables.size(); } bucket.pending = bucket.replicas.size(); } if (static_graph_) { numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_; } }

Let's show the specific logic:

- For tensor 2, it's not used, so delay_ all_ Put unused parameters directly into the reduce method.
- For tensor 1:
- numGradHooksTriggeredMap_ Initialization is 0.
- After the first iteration, it becomes 1.
- During backward propagation, prepare is called_ for_ Backward and reset_bucket_counting, numgradhooks triggeredmap_ Assigned to numgradhookstriggeredmapiteritration_.
- autograd_ The number in the hook will decrease, and then if it is 0, set this variable to ready, which is OK.

Variable 2 delay_all_reduce numGradHooksTriggeredMap_[2] = 0 +---------------> unused_parameters_.push_back(0) +----------------------------------------------------------------------------------------+ Variable 1 numGradHooksTriggeredMap_[1] = 0 + | | first_iteration | v numGradHooksTriggeredMap_[1] = 1 + | prepare_for_backward | | reset_bucket_counting v numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_ + | | | backward | | autograd_hook v YES if (++numGradHooksTriggeredMapPerIteration_[index]=== 0)?? +-------> mark_variable_ready(1) + | NO | v

### 5.5 perIterationReadyParams_

In each iteration, peritrationreadyparams_ Parameter indicating ready.

// Per iteration set of parameter indices that have been marked ready. std::unordered_set<size_t> perIterationReadyParams_;

#### 5.5.1 setting

If a variable is in the ready state, it is inserted into peritrationreadyparams_.

void Reducer::mark_variable_ready(VariableIndex index) { if (should_rebuild_buckets()) { push_rebuilt_params(index); } const auto replica_index = index.replica_index; const auto variable_index = index.variable_index; if (replica_index == 0) { checkAndRaiseMarkedTwiceError(variable_index); perIterationReadyParams_.insert(variable_index); } }

#### 5.5.2 reset

This variable is reset before back propagation.

void Reducer::prepare_for_backward( const std::vector<torch::autograd::Variable>& outputs) { // Reset per iteration marked ready parameters. perIterationReadyParams_.clear(); }

#### 5.5.3 use

Is to traverse periiterationreadyparams_, If not, return.

In rebuild_ Ensure will be called in the buckets method_ prior_ reduction_ Finished, these two methods will be called to verify.

std::vector<std::string> Reducer::getUnmarkedParamsForIteration() { std::vector<std::string> unMarkedParamNames; for (const auto& it : param_names_) { if (perIterationReadyParams_.find(it.first) == perIterationReadyParams_.end()) { unMarkedParamNames.push_back(it.second); } } return unMarkedParamNames; } std::vector<size_t> Reducer::getUnmarkedParamIndicesForIteration() { std::vector<size_t> unmarked_param_indices; const auto variable_count = replicas_[0].size(); for (size_t variable_index = 0; variable_index < variable_count; variable_index++) { if (perIterationReadyParams_.find(variable_index) == perIterationReadyParams_.end()) { unmarked_param_indices.push_back(variable_index); } } return unmarked_param_indices; }

### 5.6 used parameters

The following two variables are used to record locally used parameters, which are marked when synchronization is not enabled (no_sync is on), in the current iteration or no_ Whether these parameters have been used locally in the sync session.

Each copy of the model corresponds to a tensor in the map, and each tensor is a one-dimensional int32 (one-dim int32) tensor of the number of parameters.

These tensors are in autograd_ Tag in the hook to indicate that the corresponding parameter has been used. These tensors will be allreduce at the end of the current iteration or the backward propagation of no_sync session to calculate the global unused parameters.

// Locally used parameter maps indicating if parameters are used locally // during the current iteration or no_sync session if no_sync is on. One // tensor for each model replica and each tensor is one-dim int32 tensor of // number of parameters. These tensors are marked in autograd_hook to indicate // the corresponding param has been used, and get allreduced in the end of // backward of current iteration or no_sync session for figuring out the // globally unused parameters. // // local_used_maps_: CPU tensors for bookkeeping locally used params // local_used_maps_dev_: dev tensors for reducing globally unused params std::vector<at::Tensor> local_used_maps_; // autograd_ It will be set in hook, corresponding to those in the paper std::vector<at::Tensor> local_used_maps_dev_; // GPU

#### 5.6.1 thesis

Here can be combined with the paper.

The gradient of global unused parameters should remain unchanged in the forward and backward process. Detecting unused parameters requires global information, because in a DDP process, a parameter may not exist in one operation, but may participate in training in the same iteration of another process. Therefore, DDP maintains locally unused parameter information in the bitmap and starts additional AllReduce to collect the global bitmap. Since the bitmap is much smaller than the tensor size, all parameters in the model share the same bitmap instead of creating per bucket bitmaps. The bitmap is located on the CPU to avoid starting a dedicated CUDA kernel for each update. However, some ProcessGroup backend may not be able to run AllReduce on the CPU tensor. For example, ProcessGroupNCCL only supports CUDA tensors. In addition, since DDP should work with any custom ProcessGroup backend, it cannot assume that all backend supports CPU tensor. To solve this problem, DDP maintains another bitmap on the same device as the first model parameter, and calls a non blocking copy to move the CPU bitmap to the device bitmap for collective communication.

#### 5.6.2 initialization

The initialization function is as follows:

void Reducer::initialize_local_used_map() { const auto replica_count = replicas_.size(); const auto variable_count = replicas_[0].size(); local_used_maps_.resize(replica_count); local_used_maps_dev_.resize(replica_count); for (size_t i = 0; i < replica_count; i++) { at::TensorOptions options; options = options.dtype(at::kInt); // Deliberately don't pin the memory even if local_used_maps_dev_ will // be cuda. See Note [local_used_maps_ -> local_used_maps_dev copying] local_used_maps_[i] = at::zeros({static_cast<long>(variable_count)}, options); // This tensor needs to be on the same device as replica because backend // such as NCCL may not support CPU tensors, and hence it might not work // if we always put it on CPU. options = options.device(replicas_[i][0].device()); local_used_maps_dev_[i] = at::empty({static_cast<long>(variable_count)}, options); } }

#### 5.6.3 reset

finalize_ bucket_ Deny and finalize_backward will reset.

void Reducer::finalize_backward() { if (dynamic_graph_find_unused()) { // Reset unused parameter accounting. // See Note [local_used_maps_ -> local_used_maps_dev copying] for (auto& local_used : local_used_maps_) { local_used.fill_(0); } local_used_maps_reduced_ = false; }

#### 5.6.4 setting

autograd_ If it is used in hook, it is set to 1

void Reducer::autograd_hook(VariableIndex index) { // It will be recorded here. It has been used. // See Note [Skip allreducing local_used_maps_dev] if (dynamic_graph_find_unused() || static_graph_first_iteration()) { // Since it gets here, this param has been used for this iteration. We want // to mark it in local_used_maps_. During no_sync session, the same var can // be set multiple times, which is OK as does not affect correctness. As // long as it is used once during no_sync session, it is marked as used. local_used_maps_[index.replica_index][index.variable_index] = 1; }

#### 5.6.5 use

In mark_ variable_ All will be called when ready_ reduce_ local_ used_ Map. If synchronization is needed, synchronize here. Let's translate the notes:

- DDP uses asynchronous H2D to avoid blocking overhead. Asynchronous replication and allreduce focus on the current flow, so they will be sorted correctly.
- The correct sequence of host operations is also important. H2D copy_ It is sorted by stream, and the host is local_used_maps_ The changes are sorted by host.
- If there is a large backlog of cuda flow work, it will copy_ The operation is postponed to the future, and if from now to finalize_ If there is no blocking call between backwards, then finalize_backward will reset the local mapping used on the host to zero before the stream performs replication. In this case, copy_ These zeros will be read instead of the values we tell it to read here.
- Set local_used_maps_[i] Copying to pinned temporary memory (fixed cache allocators should be provided asynchronously) can avoid this harsh and rare contention situation.
- When you want to use all parameters, DDP itself will not do any blocking work from now to reset, so this dangerous situation is real.
- Therefore, Reducer adopts defensive operations to ensure local security_ used_ maps_ TMP and local_used_maps_[i] Different.

void Reducer::all_reduce_local_used_map() { // See Note [Skip allreducing local_used_maps_dev] // H2D from local_used_maps_ to local_used_maps_dev_ for (size_t i = 0; i < local_used_maps_.size(); i++) { if (local_used_maps_dev_[i].is_cuda()) { // Note [local_used_maps_ -> local_used_maps_dev copying] // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // We do async H2D to avoid the blocking overhead. The async copy and // allreduce respect the current stream, so will be sequenced // correctly. // // Correct sequencing with respect to host operations is also // essential. The H2D copy_ is stream ordered, while the host's // changes to local_used_maps_ are host ordered. If a large backlog of // cuda-stream work pushes the copy_ far into the future, and if no // blocking calls occur between now and finalize_backward()** such // that finalize_backward() re-zeroes local_used_maps_ on the host // before the stream executes the copy_, copy_ will read those zeros // instead of the values we thought we told it to read here. Copying // local_used_maps_[i] to a pinned temporary (which the pinned caching // allocator should supply asynchronously) avoids this nasty, rare // race condition. // // ** In the hoped-for case where all params are used, DDP itself // won't do any blocking work between now and the re-zeroing, so the // danger is real. // // Defensively ensures local_used_maps_tmp is distinct from // local_used_maps_[i] auto local_used_maps_tmp = at::native::empty_like( local_used_maps_[i], optTypeMetaToScalarType(local_used_maps_[i].options().dtype_opt()), local_used_maps_[i].options().layout_opt(), local_used_maps_[i].options().device_opt(), true /* pinned_memory */); // Paranoid asserts here because in some workloads, the pinned // allocator behaves in a way we don't understand, and may be bugged. // See https://github.com/pytorch/pytorch/pull/54474 TORCH_INTERNAL_ASSERT(local_used_maps_tmp.is_pinned()); TORCH_INTERNAL_ASSERT( local_used_maps_tmp.data_ptr() != local_used_maps_[i].data_ptr()); local_used_maps_tmp.copy_(local_used_maps_[i]); local_used_maps_dev_[i].copy_(local_used_maps_tmp, true); } else { local_used_maps_dev_[i].copy_(local_used_maps_[i], true); } } local_used_work_ = process_group_->allreduce(local_used_maps_dev_); }

### 5.7 calculation of gradient support

Next, we analyze some basic functions and support classes involved in calculating gradients.

#### 5.7.1 RpcContext

This class is used to encapsulate distributed::autograd::ContextPtr.

struct RpcContext { using ContextPtr = torch::distributed::autograd::ContextPtr; // The shared_ptr is to hold the context instance. ContextPtr context_ptr_holder; std::atomic<ContextPtr::element_type*> context_ptr{nullptr}; void set(ContextPtr&& new_context_ptr); }; RpcContext rpc_context_;

#### 5.7.2 hooks_

Its function is to maintain the autograd hook and bookkeeping.

std::vector<std::pair<uintptr_t, std::shared_ptr<torch::autograd::Node>>> hooks_;

Initialization is as follows:

// Hook to execute after the gradient accumulator has executed. hooks_.emplace_back( grad_accumulator->add_post_hook( torch::make_unique<torch::autograd::utils::LambdaPostHook>( [=](const torch::autograd::variable_list& outputs, const torch::autograd::variable_list& /* unused */) { #ifndef _WIN32 this->rpc_context_.set( ThreadLocalDistAutogradContext::getContextPtr()); #endif this->autograd_hook(index); return outputs; })), grad_accumulator);

#### 5.7.3 comm_hook_

##### 5.7.3.1 concept

Let's look at the concept through [DDP Communication Hook].

DDP communication hook is an enhancement. It provides a hook that can be used to cover DDP for cross rank gradient communication, which can be used for algorithms such as gradient compression / gossip grad. You can use the Python API register_comm_hook to register hook functions.

If the DDP communication hook is not registered, the reducer can regulate the bucket by calling allreduce. If registered, the hook will be called and handled with future work handle. If you register, reducer will also skip the step of "dividing the gradient by world size". The purpose of this is that the communication hook can completely cover the way we perform communication, and the user can fully control how to deal with the gradient.

Python commhook is a subclass of CommHookInterface, which can register a python hook. In addition, there are some built-in C + + hook implementations, which can be implemented by calling Python API register_builtin_comm_hook to specify.

// Note [DDP Communication Hook] // ~~~~~~~~~~~~~~~~~~~~~~~~~~ // If DDP communication hook is not registered, the reducer reduces the buckets // by just calling allreduce. If registered, it calls the hook and uses future // work handle. If registered, reducer also skips dividing grads by world size. // The reason for this is that the communication hook is expected to completely // override how we perform communication and the user should have complete // control over how the grads are handled. // // DDP communication hook is an enhancement that provides a hook which can be // used to override how DDP communicates gradients across ranks, this can be // used for algorithms like Gradient Compression/GossipGrad. This hook can be // registered from Python API using `register_comm_hook`. `PythonCommHook` // enables registering a Python hook and is a subclass of `CommHookInterface`. // Additionally, there are also some built-in C++ hook implementations that can // be specified by calling `register_builtin_comm_hook` from Python API.

##### 5.7.3.2 use

We use torch/distributed/algorithms/ddp_comm_hooks/default_hooks.py take a look.

The following hooks perform their own special processing before and after all reduce. If you use this hook, use ddp_model.register_comm_hook(process_group, fp16_compress_hook).

def fp16_compress_hook( process_group: dist.ProcessGroup, bucket: dist.GradBucket ) -> torch.futures.Future: """ This DDP communication hook implements a simple gradient compression approach that casts ``GradBucket`` tensors to half-precision floating-point format (``torch.float16``) and then divides it by the process group size. It allreduces those ``float16`` gradient tensors. Once compressed gradient tensors are allreduced, the chained callback ``decompress`` casts it back to the input data type (such as ``float32``). Example:: >>> ddp_model.register_comm_hook(process_group, fp16_compress_hook) """ group_to_use = process_group if process_group is not None else dist.group.WORLD world_size = group_to_use.size() compressed_tensor = bucket.get_tensor().to(torch.float16).div_(world_size) fut = dist.all_reduce( compressed_tensor, group=group_to_use, async_op=True ).get_future() def decompress(fut): decompressed_tensor = bucket.get_tensor() # Decompress in place to reduce the peak memory. # See: https://github.com/pytorch/pytorch/issues/45968 decompressed_tensor.copy_(fut.value()[0]) return [decompressed_tensor] return fut.then(decompress)

#### 5.7.4 runGradCallbackForVariable

mark_ variable_ ready_ The deny function calls runGradCallbackForVariable.

##### 5.7.4.1 Reducer

The runGradCallbackForVariable of Reducer is as follows. It calls distributed::autograd::ContextPtr.runGradCallbackForVariable to process.

void Reducer::runGradCallbackForVariable( at::Tensor& variable, GradCallback&& cb) { // Load rpc context auto context_ptr = rpc_context_.context_ptr.load(); if (context_ptr == nullptr) { cb(variable.mutable_grad()); } else { // Under distributed autograd #ifndef _WIN32 // Analysis below context_ptr->runGradCallbackForVariable(variable, std::move(cb)); #endif } }

##### 5.7.4.2 DistAutogradContext

We followed to DistAutogradContext.

It will be in the cumulative gradients, in the accumulatedGrads_ Find the gradient grad corresponding to the tensor, then use the incoming callback function to process the gradient grad, and finally copy the processed gradient back to accumulatedGrads_. In this way, a closed loop is completed from the hook obtaining the gradient to the end of the gradient after returning the protocol.

void DistAutogradContext::runGradCallbackForVariable( const torch::autograd::Variable& variable, GradCallback&& cb) { torch::Tensor grad; { std::lock_guard<std::mutex> guard(lock_); auto it = accumulatedGrads_.find(variable); // Find the gradient grad corresponding to the tensor TORCH_INTERNAL_ASSERT( it != accumulatedGrads_.end(), "The grad for the variable should exist in dist_autograd context."); grad = it->value(); } if (cb(grad)) { // Handle gradient grad with the incoming callback function std::lock_guard<std::mutex> guard(lock_); auto device = grad.device(); // Needs to update the grad in the map. accumulatedGrads_.insert_or_assign(variable, std::move(grad)); //Finally, the processed gradient is copied back to accumulatedGrads_ recordGradEvent(device); } }

accumulatedGrads of DistAutogradContext_ The current gradient corresponding to the tensor is recorded.

// DistAutogradContext which stores information for a single distributed // autograd pass on a worker. class TORCH_API DistAutogradContext { public: // Gradients accumulated in this context so far. The key is the variable on // which the gradient needs to be accumulated and the value is the gradient // that needs to be accumulated on that variable.. c10::Dict<torch::Tensor, torch::Tensor> accumulatedGrads_; }

So far, we have introduced some basic classes. We will continue to introduce them in the next chapter (there are too many...).

## 0xFF reference

How does pytorch distributed series 2 - distributed data parallel synchronize?

https://discuss.pytorch.org/t/dataparallel-imbalanced-memory-usage/22551/20

https://pytorch.org/docs/stable/distributed.html

PyTorch source code interpretation of distributed training to understand?

Practical tutorial ｜ PyTorch AutoGrad C + + layer implementation

PYTORCH automatic differentiation (I)

How does PyTorch accelerate data parallel training? Uncover the secrets of distributed Secrets

pytorch distributed training (II init_process_group)

https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

https://pytorch.org/docs/master/notes/ddp.html

https://pytorch.org/tutorials/intermediate/dist_tuto.html

Interpretation of PyTorch source code DP & DDP: model parallel and distributed training analysis