In the previous chapter, we clarified the system topology. And in chapter 7, we were able to achieve 97.844 Gbps bandwidth on a single network card. In this chapter, we will use 32 network cards corresponding to 8 GPUs and see how much bandwidth our program can achieve. We’ll name this program 9_multinet.cpp.

Network

We’ll first make a small modification to the Network class. First, we’ll record the GPU device corresponding to the network card cuda_device. Second, when opening the network, we’ll let all network cards share the same fabric object.

struct Network {
  // ...
  int cuda_device;
};

Network Network::Open(struct fi_info *fi, int cuda_device,
                      struct fid_fabric *fabric) {
  if (!fabric) {
    FI_CHECK(fi_fabric(fi->fabric_attr, &fabric, nullptr));
  }

  // ...

  return Network{fi, fabric, domain, cq, av, ep, addr, cuda_device};
}

Load Balancing

Because all our WRITE operations are of the same size, load balancing is very easy - we just need to take turns using the four network cards corresponding to each GPU. To do this, we can implement a simple NetworkGroup class that stores information about four network cards and uses Round-Robin to select the next network card.

struct NetworkGroup {
  std::vector<Network *> nets;
  uint8_t rr_mask;
  uint8_t rr_idx = 0;

  NetworkGroup(std::vector<Network *> &&nets) {
    CHECK(nets.size() <= kMaxNetworksPerGroup);
    CHECK((nets.size() & (nets.size() - 1)) == 0); // power of 2
    this->rr_mask = nets.size() - 1;
    this->nets = std::move(nets);
  }
  NetworkGroup(const NetworkGroup &) = delete;
  NetworkGroup(NetworkGroup &&) = default;

  uint8_t GetNext() {
    rr_idx = (rr_idx + 1) & rr_mask;
    return rr_idx;
  }
};

CONNECT Message

In previous programs, because we only used one network card, the client only needed to send one address to the server. Now, because we need to use multiple network cards, we need to send the number of GPUs, number of network cards, and the address and memory region for each network card in the CONNECT message. As before, we’ll put fixed-length data in the structure and variable-length data in memory after the structure.

struct AppConnectMessage {
  AppMessageBase base;
  size_t num_gpus;
  size_t num_nets;
  size_t num_mr;

  EfaAddress &net_addr(size_t index) {
    CHECK(index < num_nets);
    return ((EfaAddress *)((uintptr_t)&base + sizeof(*this)))[index];
  }

  MemoryRegion &mr(size_t index) {
    CHECK(index < num_mr);
    return ((MemoryRegion *)((uintptr_t)&base + sizeof(*this) +
                             num_nets * sizeof(EfaAddress)))[index];
  }

  size_t MessageBytes() const {
    return sizeof(*this) + num_nets * sizeof(EfaAddress) +
           num_mr * sizeof(MemoryRegion);
  }
};

Server-Side Logic

When transitioning from one network card to 8 GPUs and 32 network cards, some variables in the server-side state machine need to change from single variables to arrays.

constexpr size_t kMaxNetworksPerGroup = 4;

struct RandomFillRequestState {
  std::vector<Network> *nets;
  std::vector<NetworkGroup> *net_groups;
  std::vector<Buffer> *cuda_bufs;
  std::vector<std::array<fi_addr_t, kMaxNetworksPerGroup>> remote_addrs;
  std::vector<WriteState> write_states;
  // ...
};

When receiving a CONNECT message, we need to ensure that the network cards on the server and client sides correspond to each other, and add them to the corresponding address vector.

struct RandomFillRequestState {
  // ...

  void HandleConnect(Network &net, RdmaOp &op) {
    // ...

    // Assuming remote has the same number of GPUs and NICs.
    CHECK(msg.num_gpus == cuda_bufs->size());
    CHECK(msg.num_nets == nets->size());

    // Add peer addresses
    nets_per_gpu = msg.num_nets / msg.num_gpus;
    buf_per_gpu = connect_msg->num_mr / connect_msg->num_nets;
    for (size_t i = 0; i < msg.num_gpus; ++i) {
      std::array<fi_addr_t, kMaxNetworksPerGroup> addrs = {};
      for (size_t j = 0; j < nets_per_gpu; ++j) {
        auto idx = i * nets_per_gpu + j;
        addrs[j] = nets->at(idx).AddPeerAddress(msg.net_addr(idx));
      }
      remote_addrs.push_back(addrs);
    }

    // Initialize write states
    write_states.resize(connect_msg->num_gpus);
  }
};

We also need to modify the ContinuePostWrite() function. We’ll pass in a gpu_idx parameter to indicate that we only submit WRITE operations for the network cards corresponding to this GPU. First, we’ll use NetworkGroup to select the next network card. Then we’ll find the client address dest_addr, memory region address mr.addr, and memory region’s remote access key mr.rkey for this network card. When setting the immediate data, we also need to check if this is the last WRITE operation for the current network card (s.i_page + nets_per_gpu >= num_pages).

struct RandomFillRequestState {
  // ...

  void ContinuePostWrite(size_t gpu_idx) {
    auto &s = write_states[gpu_idx];
    if (s.i_repeat == total_repeat)
      return;
    auto page_size = request_msg->page_size;
    auto num_pages = request_msg->num_pages;

    auto net_idx = (*net_groups)[gpu_idx].GetNext();
    uint32_t imm_data = 0;
    if (s.i_repeat + 1 == total_repeat && s.i_buf + 1 == buf_per_gpu &&
        s.i_page + nets_per_gpu >= num_pages) {
      // The last WRITE. Pass remote context back.
      imm_data = request_msg->remote_context;
    }
    const auto &mr = connect_msg->mr(
        (gpu_idx * nets_per_gpu + net_idx) * buf_per_gpu + s.i_buf);
    (*net_groups)[gpu_idx].nets[net_idx]->PostWrite(
        RdmaWriteOp{.buf = &(*cuda_bufs)[gpu_idx],
         .offset = s.i_buf * (page_size * num_pages) + s.i_page * page_size,
         .len = page_size,
         .imm_data = imm_data,
         .dest_ptr = mr.addr + request_msg->page_idx(s.i_page) * page_size,
         .dest_addr = remote_addrs[gpu_idx][net_idx],
         .dest_key = mr.rkey},
        [this](Network &net, RdmaOp &op) { HandleWriteCompletion(); });
    ++posted_write_ops;

    if (++s.i_page == num_pages) {
      s.i_page = 0;
      if (++s.i_buf == buf_per_gpu) {
        s.i_buf = 0;
        if (++s.i_repeat == total_repeat)
          return;
      }
    }
  }
};

Server Main Program

In the server’s main program, we first detect the system topology, then open all network cards and group them into network groups.

int ServerMain(int argc, char **argv) {
  // Topology detection
  struct fi_info *info = GetInfo();
  auto topo_groups = DetectTopo(info);
  int num_gpus = topo_groups.size();
  int num_nets = topo_groups[0].fi_infos.size() * topo_groups.size();
  int nets_per_gpu = num_nets / num_gpus;

  // Open Network
  std::vector<Network> nets;
  std::vector<NetworkGroup> net_groups;
  for (int cuda_device = 0; cuda_device < num_gpus; ++cuda_device) {
    std::vector<Network *> group_nets;
    for (auto *fi : topo_groups[cuda_device].fi_infos) {
      int cuda_device = nets.size() / nets_per_gpu;
      auto *fabric = nets.empty() ? nullptr : nets[0].fabric;
      nets.push_back(Network::Open(fi, cuda_device, fabric));
      group_nets.push_back(&nets.back());
    }
    net_groups.push_back(NetworkGroup(std::move(group_nets)));
  }
  PrintTopologyGroups(topo_groups);

  // ...
}

Then we allocate a buffer on each GPU. Simultaneously, we register this memory region with all network cards corresponding to this GPU.

  // Allocate and register CUDA memory
  printf("Registered MR from");
  std::vector<Buffer> cuda_bufs;
  for (int i = 0; i < num_gpus; ++i) {
    CUDA_CHECK(cudaSetDevice(i));
    cuda_bufs.push_back(Buffer::AllocCuda(kMemoryRegionSize * 2, kBufAlign));
    for (int j = 0; j < nets_per_gpu; ++j) {
      nets[i * nets_per_gpu + j].RegisterMemory(cuda_bufs.back());
    }
    printf(" cuda:%d", i);
    fflush(stdout);
  }
  printf("\n");

We plan to use the first network card to receive client connections, so we allocate two buffers and register memory regions on the first network card.

  // Allocate and register message buffer
  auto buf1 = Buffer::Alloc(kMessageBufferSize, kBufAlign);
  auto buf2 = Buffer::Alloc(kMessageBufferSize, kBufAlign);
  nets[0].RegisterMemory(buf1);
  nets[0].RegisterMemory(buf2);

In the main loop, we first submit two RECV operations on the first network card. Before the state machine is complete, we iterate through each GPU. For a single GPU, we first process the completion queues of all its network cards. Then if the state machine’s state is kWrite, we continue to submit more WRITE operations.

  // Loop forever. Accept one client at a time.
  for (;;) {
    printf("------\n");
    // State machine
    RandomFillRequestState s(&nets, &net_groups, &cuda_bufs);
    // RECV for CONNECT
    nets[0].PostRecv(buf1, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
    // RECV for RandomFillRequest
    nets[0].PostRecv(buf2, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
    // Wait for completion
    while (s.state != RandomFillRequestState::State::kDone) {
      for (size_t gpu_idx = 0; gpu_idx < net_groups.size(); ++gpu_idx) {
        for (auto *net : net_groups[gpu_idx].nets) {
          net->PollCompletion();
        }
        switch (s.state) {
        case RandomFillRequestState::State::kWaitRequest:
          break;
        case RandomFillRequestState::State::kWrite:
          s.ContinuePostWrite(gpu_idx);
          break;
        case RandomFillRequestState::State::kDone:
          break;
        }
      }
    }
  }

  return 0;

Client-Side Logic

The client-side logic modifications are similar to the server-side. First, detect the system topology, then open all network cards and group them into network groups. Then allocate buffers on each GPU and register memory regions on their corresponding network cards. The first network card will allocate buffers and register memory regions. These details will not be repeated here.

Then we need to include the number of GPUs, number of network cards, the address of each network card, and the memory regions of each network card in the CONNECT message. Then use the first network card to send this message to the server.

int ClientMain(int argc, char **argv) {
  // ...

  // Send address to server
  auto &connect_msg = *(AppConnectMessage *)buf1.data;
  connect_msg = {
      .base = {.type = AppMessageType::kConnect},
      .num_gpus = (size_t)num_gpus,
      .num_nets = nets.size(),
      .num_mr = nets.size() * 2,
  };
  for (size_t i = 0; i < nets.size(); i++) {
    connect_msg.net_addr(i) = nets[i].addr;
    int cuda_device = nets[i].cuda_device;
    connect_msg.mr(i * 2) = {
        .addr = (uint64_t)cuda_bufs1[cuda_device].data,
        .size = cuda_bufs1[cuda_device].size,
        .rkey = nets[i].GetMR(cuda_bufs1[cuda_device])->key,
    };
    connect_msg.mr(i * 2 + 1) = {
        .addr = (uint64_t)cuda_bufs2[cuda_device].data,
        .size = cuda_bufs2[cuda_device].size,
        .rkey = nets[i].GetMR(cuda_bufs2[cuda_device])->key,
    };
  }
  bool connect_sent = false;
  nets[0].PostSend(
      server_addr, buf1, connect_msg.MessageBytes(),
      [&connect_sent](Network &net, RdmaOp &op) { connect_sent = true; });
  while (!connect_sent) {
    nets[0].PollCompletion();
  }
}

When waiting for the REMOTE WRITE callback function, the previous program only needed to wait for one callback, but now we need to wait for a callback on each network card.

  // Prepare to receive the last REMOTE WRITE from server
  int cnt_last_remote_write_received = 0;
  uint32_t remote_write_op_id = 0x123;
  for (auto &net : nets) {
    net.AddRemoteWrite(remote_write_op_id, [&cnt_last_remote_write_received](
                                               Network &net, RdmaOp &op) {
      ++cnt_last_remote_write_received;
    });
  }

  // Send message to server
  // ...

  // Wait for REMOTE WRITE from server
  while (cnt_last_remote_write_received != num_nets) {
    for (auto &net : nets) {
      net.PollCompletion();
    }
  }
  printf("Received RDMA WRITE to local GPU memory.\n");

With this, we have completed all modifications.

Results

In the video above, we can see that we indeed can use 8 GPUs and 32 network cards, however the transmission speed is only 287.089 Gbps, reaching just 9.0% of the total 3200 Gbps bandwidth. Next, we will optimize our program step by step until we can fully utilize the 3200 Gbps bandwidth.

Chapter code: https://github.com/abcdabcd987/libfabric-efa-demo/blob/master/src/9_multinet.cpp