Harnessing 3200Gbps Network (9): Using 32 Network Cards
In the previous chapter, we clarified the system topology. And in chapter 7, we were able to achieve 97.844 Gbps bandwidth on a single network card. In this chapter, we will use 32 network cards corresponding to 8 GPUs and see how much bandwidth our program can achieve. We’ll name this program 9_multinet.cpp
.
Network
We’ll first make a small modification to the Network
class. First, we’ll record the GPU device corresponding to the network card cuda_device
. Second, when opening the network, we’ll let all network cards share the same fabric
object.
struct Network {
// ...
int cuda_device;
};
Network Network::Open(struct fi_info *fi, int cuda_device,
struct fid_fabric *fabric) {
if (!fabric) {
FI_CHECK(fi_fabric(fi->fabric_attr, &fabric, nullptr));
}
// ...
return Network{fi, fabric, domain, cq, av, ep, addr, cuda_device};
}
Load Balancing
Because all our WRITE
operations are of the same size, load balancing is very easy - we just need to take turns using the four network cards corresponding to each GPU. To do this, we can implement a simple NetworkGroup
class that stores information about four network cards and uses Round-Robin to select the next network card.
struct NetworkGroup {
std::vector<Network *> nets;
uint8_t rr_mask;
uint8_t rr_idx = 0;
NetworkGroup(std::vector<Network *> &&nets) {
CHECK(nets.size() <= kMaxNetworksPerGroup);
CHECK((nets.size() & (nets.size() - 1)) == 0); // power of 2
this->rr_mask = nets.size() - 1;
this->nets = std::move(nets);
}
NetworkGroup(const NetworkGroup &) = delete;
NetworkGroup(NetworkGroup &&) = default;
uint8_t GetNext() {
rr_idx = (rr_idx + 1) & rr_mask;
return rr_idx;
}
};
CONNECT
Message
In previous programs, because we only used one network card, the client only needed to send one address to the server. Now, because we need to use multiple network cards, we need to send the number of GPUs, number of network cards, and the address and memory region for each network card in the CONNECT
message. As before, we’ll put fixed-length data in the structure and variable-length data in memory after the structure.
struct AppConnectMessage {
AppMessageBase base;
size_t num_gpus;
size_t num_nets;
size_t num_mr;
EfaAddress &net_addr(size_t index) {
CHECK(index < num_nets);
return ((EfaAddress *)((uintptr_t)&base + sizeof(*this)))[index];
}
MemoryRegion &mr(size_t index) {
CHECK(index < num_mr);
return ((MemoryRegion *)((uintptr_t)&base + sizeof(*this) +
num_nets * sizeof(EfaAddress)))[index];
}
size_t MessageBytes() const {
return sizeof(*this) + num_nets * sizeof(EfaAddress) +
num_mr * sizeof(MemoryRegion);
}
};
Server-Side Logic
When transitioning from one network card to 8 GPUs and 32 network cards, some variables in the server-side state machine need to change from single variables to arrays.
constexpr size_t kMaxNetworksPerGroup = 4;
struct RandomFillRequestState {
std::vector<Network> *nets;
std::vector<NetworkGroup> *net_groups;
std::vector<Buffer> *cuda_bufs;
std::vector<std::array<fi_addr_t, kMaxNetworksPerGroup>> remote_addrs;
std::vector<WriteState> write_states;
// ...
};
When receiving a CONNECT
message, we need to ensure that the network cards on the server and client sides correspond to each other, and add them to the corresponding address vector.
struct RandomFillRequestState {
// ...
void HandleConnect(Network &net, RdmaOp &op) {
// ...
// Assuming remote has the same number of GPUs and NICs.
CHECK(msg.num_gpus == cuda_bufs->size());
CHECK(msg.num_nets == nets->size());
// Add peer addresses
nets_per_gpu = msg.num_nets / msg.num_gpus;
buf_per_gpu = connect_msg->num_mr / connect_msg->num_nets;
for (size_t i = 0; i < msg.num_gpus; ++i) {
std::array<fi_addr_t, kMaxNetworksPerGroup> addrs = {};
for (size_t j = 0; j < nets_per_gpu; ++j) {
auto idx = i * nets_per_gpu + j;
addrs[j] = nets->at(idx).AddPeerAddress(msg.net_addr(idx));
}
remote_addrs.push_back(addrs);
}
// Initialize write states
write_states.resize(connect_msg->num_gpus);
}
};
We also need to modify the ContinuePostWrite()
function. We’ll pass in a gpu_idx
parameter to indicate that we only submit WRITE
operations for the network cards corresponding to this GPU. First, we’ll use NetworkGroup
to select the next network card. Then we’ll find the client address dest_addr
, memory region address mr.addr
, and memory region’s remote access key mr.rkey
for this network card. When setting the immediate data, we also need to check if this is the last WRITE
operation for the current network card (s.i_page + nets_per_gpu >= num_pages
).
struct RandomFillRequestState {
// ...
void ContinuePostWrite(size_t gpu_idx) {
auto &s = write_states[gpu_idx];
if (s.i_repeat == total_repeat)
return;
auto page_size = request_msg->page_size;
auto num_pages = request_msg->num_pages;
auto net_idx = (*net_groups)[gpu_idx].GetNext();
uint32_t imm_data = 0;
if (s.i_repeat + 1 == total_repeat && s.i_buf + 1 == buf_per_gpu &&
s.i_page + nets_per_gpu >= num_pages) {
// The last WRITE. Pass remote context back.
imm_data = request_msg->remote_context;
}
const auto &mr = connect_msg->mr(
(gpu_idx * nets_per_gpu + net_idx) * buf_per_gpu + s.i_buf);
(*net_groups)[gpu_idx].nets[net_idx]->PostWrite(
RdmaWriteOp{.buf = &(*cuda_bufs)[gpu_idx],
.offset = s.i_buf * (page_size * num_pages) + s.i_page * page_size,
.len = page_size,
.imm_data = imm_data,
.dest_ptr = mr.addr + request_msg->page_idx(s.i_page) * page_size,
.dest_addr = remote_addrs[gpu_idx][net_idx],
.dest_key = mr.rkey},
[this](Network &net, RdmaOp &op) { HandleWriteCompletion(); });
++posted_write_ops;
if (++s.i_page == num_pages) {
s.i_page = 0;
if (++s.i_buf == buf_per_gpu) {
s.i_buf = 0;
if (++s.i_repeat == total_repeat)
return;
}
}
}
};
Server Main Program
In the server’s main program, we first detect the system topology, then open all network cards and group them into network groups.
int ServerMain(int argc, char **argv) {
// Topology detection
struct fi_info *info = GetInfo();
auto topo_groups = DetectTopo(info);
int num_gpus = topo_groups.size();
int num_nets = topo_groups[0].fi_infos.size() * topo_groups.size();
int nets_per_gpu = num_nets / num_gpus;
// Open Network
std::vector<Network> nets;
std::vector<NetworkGroup> net_groups;
for (int cuda_device = 0; cuda_device < num_gpus; ++cuda_device) {
std::vector<Network *> group_nets;
for (auto *fi : topo_groups[cuda_device].fi_infos) {
int cuda_device = nets.size() / nets_per_gpu;
auto *fabric = nets.empty() ? nullptr : nets[0].fabric;
nets.push_back(Network::Open(fi, cuda_device, fabric));
group_nets.push_back(&nets.back());
}
net_groups.push_back(NetworkGroup(std::move(group_nets)));
}
PrintTopologyGroups(topo_groups);
// ...
}
Then we allocate a buffer on each GPU. Simultaneously, we register this memory region with all network cards corresponding to this GPU.
// Allocate and register CUDA memory
printf("Registered MR from");
std::vector<Buffer> cuda_bufs;
for (int i = 0; i < num_gpus; ++i) {
CUDA_CHECK(cudaSetDevice(i));
cuda_bufs.push_back(Buffer::AllocCuda(kMemoryRegionSize * 2, kBufAlign));
for (int j = 0; j < nets_per_gpu; ++j) {
nets[i * nets_per_gpu + j].RegisterMemory(cuda_bufs.back());
}
printf(" cuda:%d", i);
fflush(stdout);
}
printf("\n");
We plan to use the first network card to receive client connections, so we allocate two buffers and register memory regions on the first network card.
// Allocate and register message buffer
auto buf1 = Buffer::Alloc(kMessageBufferSize, kBufAlign);
auto buf2 = Buffer::Alloc(kMessageBufferSize, kBufAlign);
nets[0].RegisterMemory(buf1);
nets[0].RegisterMemory(buf2);
In the main loop, we first submit two RECV
operations on the first network card. Before the state machine is complete, we iterate through each GPU. For a single GPU, we first process the completion queues of all its network cards. Then if the state machine’s state is kWrite
, we continue to submit more WRITE
operations.
// Loop forever. Accept one client at a time.
for (;;) {
printf("------\n");
// State machine
RandomFillRequestState s(&nets, &net_groups, &cuda_bufs);
// RECV for CONNECT
nets[0].PostRecv(buf1, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
// RECV for RandomFillRequest
nets[0].PostRecv(buf2, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
// Wait for completion
while (s.state != RandomFillRequestState::State::kDone) {
for (size_t gpu_idx = 0; gpu_idx < net_groups.size(); ++gpu_idx) {
for (auto *net : net_groups[gpu_idx].nets) {
net->PollCompletion();
}
switch (s.state) {
case RandomFillRequestState::State::kWaitRequest:
break;
case RandomFillRequestState::State::kWrite:
s.ContinuePostWrite(gpu_idx);
break;
case RandomFillRequestState::State::kDone:
break;
}
}
}
}
return 0;
Client-Side Logic
The client-side logic modifications are similar to the server-side. First, detect the system topology, then open all network cards and group them into network groups. Then allocate buffers on each GPU and register memory regions on their corresponding network cards. The first network card will allocate buffers and register memory regions. These details will not be repeated here.
Then we need to include the number of GPUs, number of network cards, the address of each network card, and the memory regions of each network card in the CONNECT
message. Then use the first network card to send this message to the server.
int ClientMain(int argc, char **argv) {
// ...
// Send address to server
auto &connect_msg = *(AppConnectMessage *)buf1.data;
connect_msg = {
.base = {.type = AppMessageType::kConnect},
.num_gpus = (size_t)num_gpus,
.num_nets = nets.size(),
.num_mr = nets.size() * 2,
};
for (size_t i = 0; i < nets.size(); i++) {
connect_msg.net_addr(i) = nets[i].addr;
int cuda_device = nets[i].cuda_device;
connect_msg.mr(i * 2) = {
.addr = (uint64_t)cuda_bufs1[cuda_device].data,
.size = cuda_bufs1[cuda_device].size,
.rkey = nets[i].GetMR(cuda_bufs1[cuda_device])->key,
};
connect_msg.mr(i * 2 + 1) = {
.addr = (uint64_t)cuda_bufs2[cuda_device].data,
.size = cuda_bufs2[cuda_device].size,
.rkey = nets[i].GetMR(cuda_bufs2[cuda_device])->key,
};
}
bool connect_sent = false;
nets[0].PostSend(
server_addr, buf1, connect_msg.MessageBytes(),
[&connect_sent](Network &net, RdmaOp &op) { connect_sent = true; });
while (!connect_sent) {
nets[0].PollCompletion();
}
}
When waiting for the REMOTE WRITE
callback function, the previous program only needed to wait for one callback, but now we need to wait for a callback on each network card.
// Prepare to receive the last REMOTE WRITE from server
int cnt_last_remote_write_received = 0;
uint32_t remote_write_op_id = 0x123;
for (auto &net : nets) {
net.AddRemoteWrite(remote_write_op_id, [&cnt_last_remote_write_received](
Network &net, RdmaOp &op) {
++cnt_last_remote_write_received;
});
}
// Send message to server
// ...
// Wait for REMOTE WRITE from server
while (cnt_last_remote_write_received != num_nets) {
for (auto &net : nets) {
net.PollCompletion();
}
}
printf("Received RDMA WRITE to local GPU memory.\n");
With this, we have completed all modifications.
Results
In the video above, we can see that we indeed can use 8 GPUs and 32 network cards, however the transmission speed is only 287.089 Gbps, reaching just 9.0% of the total 3200 Gbps bandwidth. Next, we will optimize our program step by step until we can fully utilize the 3200 Gbps bandwidth.
Chapter code: https://github.com/abcdabcd987/libfabric-efa-demo/blob/master/src/9_multinet.cpp