Harnessing 3200Gbps Network (13): State Sharding
In the previous chapter, we bound CPU cores for the multi-threaded version, which boosted the transmission speed to 1237.738 Gbps. Unfortunately, this still only reached 38.7% of the total 3200 Gbps bandwidth. Clearly, the CPU remains a bottleneck, and we need to optimize further.
Looking back at our process of converting the program from single-threaded to multi-threaded, we changed some variables to atomic types to avoid race conditions between multiple threads when reading and writing state. This included the posted_write_ops
and finished_write_ops
variables. Because we submit and complete millions of operations within a second, operations on these two atomic variables can be very frequent and might become a bottleneck.
Note that in our program, each thread is responsible for one GPU, and their progress is independent of each other. We don’t need to perform atomic increment operations for every operation submission and completion. In this chapter, we attempt to shard the state of different threads so that most of the time, each thread doesn’t need to read or write the state of other threads. We name the program in this chapter 13_shard.cpp
.
State Sharding
We changed the posted_write_ops
and finished_write_ops
atomic variables to arrays, where each thread is only responsible for its own state. Additionally, we added an atomic variable cnt_finished_gpus
to synchronize the number of completed threads.
struct RandomFillRequestState {
// ...
std::array<uint64_t, 8> posted_write_ops;
std::array<uint64_t, 8> finished_write_ops;
std::atomic<size_t> cnt_finished_gpus = 0;
// ...
};
Before the state machine transitions to kWrite
, we need to initialize these two arrays.
struct RandomFillRequestState {
// ...
void HandleWarmupCompletion(Network &net, RdmaOp &op) {
// ...
// Prepare RDMA WRITE the data to remote GPU.
printf("Started RDMA WRITE to the remote GPU memory.\n");
// ...
std::fill(posted_write_ops.begin(), posted_write_ops.end(), 0);
std::fill(finished_write_ops.begin(), finished_write_ops.end(), 0);
state = State::kWrite;
}
};
When submitting a new WRITE
operation, we only need to operate on the state of the current thread.
struct RandomFillRequestState {
// ...
void ContinuePostWrite(size_t gpu_idx) {
// ...
group.nets[net_idx]->PostWrite(
RdmaWriteOp{ ... },
[this](Network &net, RdmaOp &op) { HandleWriteCompletion(net, op); });
++posted_write_ops[gpu_idx];
// ...
}
};
In the completion callback of the WRITE
operation, we need to handle it a bit more carefully. First, we need to increment finished_write_ops
for the current thread. If the condition for outputting progress is met, we need to read the progress of all threads and sum them up. Because we don’t require completely accurate values when outputting progress, we don’t need to add extra synchronization operations and can directly perform dirty reads of other threads’ states. Furthermore, we can use the AVX2 instruction set to accelerate the summation operation.
On the other hand, if the number of completed operations for the current thread reaches the total number of operations, we need to increment cnt_finished_gpus
and check if all threads have completed. If all threads have completed, we can output the final progress and end the program. Because cnt_finished_gpus
is an atomic variable, we can ensure correctness here. And since we only read and write cnt_finished_gpus
when the current thread has completed all operations, there won’t be a performance degradation due to contention on the atomic variable.
uint64_t SumU64x8AVX2(const std::array<uint64_t, 8> &arr) {
// https://www.perplexity.ai/search/c-sum-over-a-std-array-uint64-p8.MLN_mQeOwoik79acOxg
__m256i sum_vec = _mm256_loadu_si256((const __m256i *)arr.data());
sum_vec = _mm256_add_epi64(
sum_vec, _mm256_loadu_si256((const __m256i *)(arr.data() + 4)));
__m128i sum_128 = _mm_add_epi64(_mm256_extracti128_si256(sum_vec, 0),
_mm256_extracti128_si256(sum_vec, 1));
return _mm_extract_epi64(sum_128, 0) + _mm_extract_epi64(sum_128, 1);
}
struct RandomFillRequestState {
// ...
void HandleWriteCompletion(Network &net, RdmaOp &op) {
auto gpu_finished_ops = ++finished_write_ops[op.write.buf->cuda_device];
if (gpu_finished_ops % 16384 == 0) {
auto now = std::chrono::high_resolution_clock::now();
auto posted = SumU64x8AVX2(posted_write_ops);
auto finished = SumU64x8AVX2(finished_write_ops);
PrintProgress(now, posted, finished);
}
if (gpu_finished_ops == total_write_ops_per_gpu) {
if (++cnt_finished_gpus == connect_msg->num_gpus) {
state = State::kDone;
PrintProgress(std::chrono::high_resolution_clock::now(),
total_write_ops, total_write_ops);
printf("\nFinished all RDMA WRITEs to the remote GPU memory.\n");
}
}
}
};
That’s all the changes. We can see that by sharding the state, we avoid contention between multiple threads when reading and writing state, improving the performance of the program. Let’s take a look at the running effect.
Results
From the video above, we can see that after sharding the state of different threads, the transmission speed reached 1522.567 Gbps, achieving 47.6% of the total 3200 Gbps bandwidth. Compared to the previous 1237.738 Gbps, it’s a 23% improvement. This is a decent boost, but we’re still halfway from our goal, so we need to keep working hard.
Code for this chapter: https://github.com/abcdabcd987/libfabric-efa-demo/blob/master/src/13_shard.cpp