Harnessing 3200Gbps Network (7): Queuing and Benchmark
In the previous chapter, we implemented GPUDirect RDMA WRITE. However, in the previous chapter’s program, we only submitted 16 WRITE
operations at once. If we transmit more data, fi_writemsg()
would return -EAGAIN
due to network congestion, and our program did not handle this situation. In this chapter, we will implement an operation queue that temporarily stores operations that cannot be submitted, waiting for network congestion to dissipate before submitting them. Meanwhile, we can also test the bandwidth our program can achieve. We’ll name this program 7_queue.cpp
.
Operation Queue
First, we’ll add an operation queue to the Network
class to store all pending operations.
struct Network {
// ...
std::deque<RdmaOp *> pending_ops;
};
Next, we’ll add a function to submit as many operations as possible, until all operations are submitted or -EAGAIN
is encountered.
void Network::ProgressPendingOps() {
while (!pending_ops.empty()) {
auto *op = pending_ops.front();
pending_ops.pop_front();
ssize_t ret = 0;
switch (op->type) {
case RdmaOpType::kRecv:
ret = fi_recvmsg(...);
break;
case RdmaOpType::kSend:
ret = fi_sendmsg(...);
break;
case RdmaOpType::kWrite:
ret = fi_writemsg(...);
break;
case RdmaOpType::kRemoteWrite:
CHECK(false); // Unreachable
break;
}
if (ret == -FI_EAGAIN) {
// Put it back to the front of the queue
pending_ops.push_front(op);
break;
}
if (ret) {
// Unexpected error. Don't put it back.
// Delete the op since it's not going to be in the completion queue.
delete op;
fprintf(stderr, "Failed to ProgressPendingOps. ret=%ld (%s)\n",
ret, fi_strerror(-ret));
fflush(stderr);
break;
}
}
}
Because the main body of operation submission (i.e., fi_{recv,send,write}msg()
) has been moved to the ProgressPendingOps()
function, our Post{Recv,Send,Write}()
functions now become very simple, just putting the operation into the queue and then calling ProgressPendingOps()
.
void Network::PostRecv(...) {
auto *op = new RdmaOp{ ... };
pending_ops.push_back(op);
ProgressPendingOps();
}
void Network::PostSend(...) {
auto *op = new RdmaOp{ ... };
pending_ops.push_back(op);
ProgressPendingOps();
}
void Network::PostWrite(...) {
auto *op = new RdmaOp{ ... };
pending_ops.push_back(op);
ProgressPendingOps();
}
Finally, we also need to modify the PollCompletion()
function. When we receive some completion events, it means the network may have freed up some space, and we can try to submit more operations. Therefore, after processing the completion queue, we call ProgressPendingOps()
again.
void Network::PollCompletion() {
// Process completions
struct fi_cq_data_entry cqe[kCompletionQueueReadCount];
for (;;) {
auto ret = fi_cq_read(cq, cqe, kCompletionQueueReadCount);
// ...
}
// Try to make progress.
ProgressPendingOps();
}
These are all the modifications to the Network
class. With these changes, our network library can now handle network congestion.
Server-Side Logic
In previous programs, the server would submit all WRITE
operations at once and then return to the main loop to process the completion queue. This logic is not feasible under network congestion. When -EAGAIN
appears, we must wait for some previous operations to complete and process the completion queue. If we do not process the completion queue, even if network congestion dissipates, we cannot submit new operations. Therefore, we need to modify the server-side logic to alternately submit new operations and process the completion queue.
To make our bandwidth test last longer, we will repeat each WRITE
operation 500 times.
Now, let’s modify the server-side state machine. First, let’s redefine the members in the state machine structure. We need to add a State
enumeration type to represent the state machine’s state; add a WriteState
structure to store loop variables; and add some member variables to record the start time of the bandwidth test and current progress.
struct RandomFillRequestState {
enum class State {
kWaitRequest,
kWrite,
kDone,
};
struct WriteState {
size_t i_repeat;
size_t i_buf;
size_t i_page;
};
Network *net;
Buffer *cuda_buf;
size_t total_bw = 0;
State state = State::kWaitRequest;
fi_addr_t client_addr = FI_ADDR_UNSPEC;
AppConnectMessage *connect_msg = nullptr;
AppRandomFillMessage *request_msg = nullptr;
size_t total_repeat = 500;
WriteState write_state;
size_t total_write_ops = 0;
size_t write_op_size = 0;
size_t posted_write_ops = 0;
size_t finished_write_ops = 0;
std::chrono::time_point<std::chrono::high_resolution_clock> write_start_at;
};
When the server receives a RANDOM_FILL
request, instead of submitting all WRITE
operations at once, it will set the relevant variables and transition the state machine to the kWrite
state.
struct RandomFillRequestState {
// ...
void HandleRequest(Network &net, RdmaOp &op) {
// ...
// Generate random data and copy to local GPU memory
// ...
// Prepare RDMA WRITE the data to remote GPU.
total_write_ops =
connect_msg->num_mr * request_msg->num_pages * total_repeat;
posted_write_ops = 0;
finished_write_ops = 0;
write_op_size = request_msg->page_size;
write_state = {.i_repeat = 0, .i_buf = 0, .i_page = 0};
write_start_at = std::chrono::high_resolution_clock::now();
state = State::kWrite;
printf("Started RDMA WRITE to the remote GPU memory.\n");
}
};
We’ll place the code for submitting WRITE
operations in a new function:
struct RandomFillRequestState {
// ...
void ContinuePostWrite() {
auto &s = write_state;
if (s.i_repeat == total_repeat)
return;
auto page_size = request_msg->page_size;
auto num_pages = request_msg->num_pages;
uint32_t imm_data = 0;
if (s.i_repeat + 1 == total_repeat && s.i_buf + 1 == connect_msg->num_mr &&
s.i_page + 1 == num_pages) {
// The last WRITE. Pass remote context back.
imm_data = request_msg->remote_context;
}
net->PostWrite(
RdmaWriteOp{ ... },
[this](Network &net, RdmaOp &op) { HandleWriteCompletion(); });
++posted_write_ops;
if (++s.i_page == num_pages) {
s.i_page = 0;
if (++s.i_buf == connect_msg->num_mr) {
s.i_buf = 0;
++s.i_repeat;
}
}
}
};
In ContinuePostWrite()
, if there are WRITE
operations that have not been submitted, we submit a new WRITE
operation. Unlike the previous chapter, we set a callback function HandleWriteCompletion()
for each WRITE
operation. In this callback function, we will output the current progress and bandwidth. When the last WRITE
operation is completed, we transition the state machine to the kDone
state.
struct RandomFillRequestState {
// ...
void PrintProgress(...) { ... }
void HandleWriteCompletion() {
++finished_write_ops;
if (finished_write_ops % 16384 == 0) {
auto now = std::chrono::high_resolution_clock::now();
PrintProgress(now, posted_write_ops, finished_write_ops);
}
if (finished_write_ops == total_write_ops) {
auto now = std::chrono::high_resolution_clock::now();
PrintProgress(now, posted_write_ops, finished_write_ops);
printf("\nFinished all RDMA WRITEs to the remote GPU memory.\n");
state = State::kDone;
}
}
};
Finally, we modify the server-side main loop. In addition to processing the completion queue each time, we also check the state machine’s state. If the state machine is in the kWrite
state, we continue to submit WRITE
operations.
int ServerMain(int argc, char **argv) {
// ...
// Loop forever. Accept one client at a time.
for (;;) {
printf("------\n");
// State machine
RandomFillRequestState s(&net, &cuda_buf);
// RECV for CONNECT
net.PostRecv(buf1, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
// RECV for RandomFillRequest
net.PostRecv(buf2, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
// Wait for completion
while (s.state != RandomFillRequestState::State::kDone) {
net.PollCompletion();
switch (s.state) {
case RandomFillRequestState::State::kWaitRequest:
break;
case RandomFillRequestState::State::kWrite:
s.ContinuePostWrite();
break;
case RandomFillRequestState::State::kDone:
break;
}
}
}
return 0;
}
Client-Side Logic
The client-side logic does not require major modifications. The only small change is that we increase the GPU buffer size and reduce the default page size to 64 KiB, increasing the number of pages transmitted to 1000. Since the number of pages transmitted has increased, the RANDOM_FILL
message size has also increased, so we also increase the size of the buffer used for receiving and sending.
constexpr size_t kMessageBufferSize = 1 << 20;
constexpr size_t kMemoryRegionSize = 1UL << 30;
int ClientMain(int argc, char **argv) {
size_t page_size = 65536;
size_t num_pages = 1000;
// ...
}
Results
As you can see, the transmission speed reached 97.844 Gbps, almost fully utilizing the bandwidth.
Source code for this chapter: https://github.com/abcdabcd987/libfabric-efa-demo/blob/master/src/7_queue.cpp