In the previous chapter, we implemented GPUDirect RDMA WRITE. However, in the previous chapter’s program, we only submitted 16 WRITE operations at once. If we transmit more data, fi_writemsg() would return -EAGAIN due to network congestion, and our program did not handle this situation. In this chapter, we will implement an operation queue that temporarily stores operations that cannot be submitted, waiting for network congestion to dissipate before submitting them. Meanwhile, we can also test the bandwidth our program can achieve. We’ll name this program 7_queue.cpp.

Operation Queue

First, we’ll add an operation queue to the Network class to store all pending operations.

struct Network {
  // ...
  std::deque<RdmaOp *> pending_ops;
};

Next, we’ll add a function to submit as many operations as possible, until all operations are submitted or -EAGAIN is encountered.

void Network::ProgressPendingOps() {
  while (!pending_ops.empty()) {
    auto *op = pending_ops.front();
    pending_ops.pop_front();
    ssize_t ret = 0;
    switch (op->type) {
    case RdmaOpType::kRecv:
      ret = fi_recvmsg(...);
      break;
    case RdmaOpType::kSend:
      ret = fi_sendmsg(...);
      break;
    case RdmaOpType::kWrite:
      ret = fi_writemsg(...);
      break;
    case RdmaOpType::kRemoteWrite:
      CHECK(false); // Unreachable
      break;
    }
    if (ret == -FI_EAGAIN) {
      // Put it back to the front of the queue
      pending_ops.push_front(op);
      break;
    }
    if (ret) {
      // Unexpected error. Don't put it back.
      // Delete the op since it's not going to be in the completion queue.
      delete op;
      fprintf(stderr, "Failed to ProgressPendingOps. ret=%ld (%s)\n",
              ret, fi_strerror(-ret));
      fflush(stderr);
      break;
    }
  }
}

Because the main body of operation submission (i.e., fi_{recv,send,write}msg()) has been moved to the ProgressPendingOps() function, our Post{Recv,Send,Write}() functions now become very simple, just putting the operation into the queue and then calling ProgressPendingOps().

void Network::PostRecv(...) {
  auto *op = new RdmaOp{ ... };
  pending_ops.push_back(op);
  ProgressPendingOps();
}

void Network::PostSend(...) {
  auto *op = new RdmaOp{ ... };
  pending_ops.push_back(op);
  ProgressPendingOps();
}

void Network::PostWrite(...) {
  auto *op = new RdmaOp{ ... };
  pending_ops.push_back(op);
  ProgressPendingOps();
}

Finally, we also need to modify the PollCompletion() function. When we receive some completion events, it means the network may have freed up some space, and we can try to submit more operations. Therefore, after processing the completion queue, we call ProgressPendingOps() again.

void Network::PollCompletion() {
  // Process completions
  struct fi_cq_data_entry cqe[kCompletionQueueReadCount];
  for (;;) {
    auto ret = fi_cq_read(cq, cqe, kCompletionQueueReadCount);
    // ...
  }

  // Try to make progress.
  ProgressPendingOps();
}

These are all the modifications to the Network class. With these changes, our network library can now handle network congestion.

Server-Side Logic

In previous programs, the server would submit all WRITE operations at once and then return to the main loop to process the completion queue. This logic is not feasible under network congestion. When -EAGAIN appears, we must wait for some previous operations to complete and process the completion queue. If we do not process the completion queue, even if network congestion dissipates, we cannot submit new operations. Therefore, we need to modify the server-side logic to alternately submit new operations and process the completion queue.

To make our bandwidth test last longer, we will repeat each WRITE operation 500 times.

Now, let’s modify the server-side state machine. First, let’s redefine the members in the state machine structure. We need to add a State enumeration type to represent the state machine’s state; add a WriteState structure to store loop variables; and add some member variables to record the start time of the bandwidth test and current progress.

struct RandomFillRequestState {
  enum class State {
    kWaitRequest,
    kWrite,
    kDone,
  };

  struct WriteState {
    size_t i_repeat;
    size_t i_buf;
    size_t i_page;
  };

  Network *net;
  Buffer *cuda_buf;
  size_t total_bw = 0;
  State state = State::kWaitRequest;

  fi_addr_t client_addr = FI_ADDR_UNSPEC;
  AppConnectMessage *connect_msg = nullptr;
  AppRandomFillMessage *request_msg = nullptr;

  size_t total_repeat = 500;
  WriteState write_state;
  size_t total_write_ops = 0;
  size_t write_op_size = 0;
  size_t posted_write_ops = 0;
  size_t finished_write_ops = 0;
  std::chrono::time_point<std::chrono::high_resolution_clock> write_start_at;
};

When the server receives a RANDOM_FILL request, instead of submitting all WRITE operations at once, it will set the relevant variables and transition the state machine to the kWrite state.

struct RandomFillRequestState {
  // ...

  void HandleRequest(Network &net, RdmaOp &op) {
    // ...
    // Generate random data and copy to local GPU memory
    // ...

    // Prepare RDMA WRITE the data to remote GPU.
    total_write_ops =
        connect_msg->num_mr * request_msg->num_pages * total_repeat;
    posted_write_ops = 0;
    finished_write_ops = 0;
    write_op_size = request_msg->page_size;
    write_state = {.i_repeat = 0, .i_buf = 0, .i_page = 0};
    write_start_at = std::chrono::high_resolution_clock::now();
    state = State::kWrite;
    printf("Started RDMA WRITE to the remote GPU memory.\n");
  }
};

We’ll place the code for submitting WRITE operations in a new function:

struct RandomFillRequestState {
  // ...

  void ContinuePostWrite() {
    auto &s = write_state;
    if (s.i_repeat == total_repeat)
      return;
    auto page_size = request_msg->page_size;
    auto num_pages = request_msg->num_pages;

    uint32_t imm_data = 0;
    if (s.i_repeat + 1 == total_repeat && s.i_buf + 1 == connect_msg->num_mr &&
        s.i_page + 1 == num_pages) {
      // The last WRITE. Pass remote context back.
      imm_data = request_msg->remote_context;
    }
    net->PostWrite(
        RdmaWriteOp{ ... },
        [this](Network &net, RdmaOp &op) { HandleWriteCompletion(); });
    ++posted_write_ops;

    if (++s.i_page == num_pages) {
      s.i_page = 0;
      if (++s.i_buf == connect_msg->num_mr) {
        s.i_buf = 0;
        ++s.i_repeat;
      }
    }
  }
};

In ContinuePostWrite(), if there are WRITE operations that have not been submitted, we submit a new WRITE operation. Unlike the previous chapter, we set a callback function HandleWriteCompletion() for each WRITE operation. In this callback function, we will output the current progress and bandwidth. When the last WRITE operation is completed, we transition the state machine to the kDone state.

struct RandomFillRequestState {
  // ...

  void PrintProgress(...) { ... }

  void HandleWriteCompletion() {
    ++finished_write_ops;
    if (finished_write_ops % 16384 == 0) {
      auto now = std::chrono::high_resolution_clock::now();
      PrintProgress(now, posted_write_ops, finished_write_ops);
    }
    if (finished_write_ops == total_write_ops) {
      auto now = std::chrono::high_resolution_clock::now();
      PrintProgress(now, posted_write_ops, finished_write_ops);
      printf("\nFinished all RDMA WRITEs to the remote GPU memory.\n");
      state = State::kDone;
    }
  }
};

Finally, we modify the server-side main loop. In addition to processing the completion queue each time, we also check the state machine’s state. If the state machine is in the kWrite state, we continue to submit WRITE operations.

int ServerMain(int argc, char **argv) {
  // ...

  // Loop forever. Accept one client at a time.
  for (;;) {
    printf("------\n");
    // State machine
    RandomFillRequestState s(&net, &cuda_buf);
    // RECV for CONNECT
    net.PostRecv(buf1, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
    // RECV for RandomFillRequest
    net.PostRecv(buf2, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
    // Wait for completion
    while (s.state != RandomFillRequestState::State::kDone) {
      net.PollCompletion();
      switch (s.state) {
      case RandomFillRequestState::State::kWaitRequest:
        break;
      case RandomFillRequestState::State::kWrite:
        s.ContinuePostWrite();
        break;
      case RandomFillRequestState::State::kDone:
        break;
      }
    }
  }
  return 0;
}

Client-Side Logic

The client-side logic does not require major modifications. The only small change is that we increase the GPU buffer size and reduce the default page size to 64 KiB, increasing the number of pages transmitted to 1000. Since the number of pages transmitted has increased, the RANDOM_FILL message size has also increased, so we also increase the size of the buffer used for receiving and sending.

constexpr size_t kMessageBufferSize = 1 << 20;
constexpr size_t kMemoryRegionSize = 1UL << 30;

int ClientMain(int argc, char **argv) {
  size_t page_size = 65536;
  size_t num_pages = 1000;
  // ...
}

Results

As you can see, the transmission speed reached 97.844 Gbps, almost fully utilizing the bandwidth.

Source code for this chapter: https://github.com/abcdabcd987/libfabric-efa-demo/blob/master/src/7_queue.cpp