Harnessing 3200Gbps Network (5): Bidirectional SEND and RECV
In the previous chapter, we implemented unidirectional receive and send. In this chapter, we will extend the previous program to implement bidirectional receive and send. After the server receives a message from the client, it will reverse the message and send it back to the client. We’ll name this program 5_reverse.cpp
.
Different Types of Messages
To allow the server to send a message to the client, the server must first know the client’s address. Therefore, we’ll first have the client send its address to the server, then send a message to the server, and finally receive a message back from the server. To distinguish different types of messages, we’ll add a type
field to the message header.
enum class AppMessageType : uint8_t {
kConnect = 0,
kData = 1,
};
struct AppMessageBase {
AppMessageType type;
};
struct AppConnectMessage {
AppMessageBase base;
EfaAddress client_addr;
};
struct AppDataMessage {
AppMessageBase base;
// Data follows
};
Note that the AppDataMessage
structure doesn’t define a length because the buffer length will be specified during sending, and the buffer length will be obtained during receiving. Subtracting the header length gives the data length.
Why not merge the CONNECT
and DATA
messages into one?
- The server doesn’t need to send a
CONNECT
message to the client, so there’s no need to includeCONNECT
content in theDATA
message. - Separating these makes the communication logic clearer.
- The client can send multiple
DATA
messages without sending aCONNECT
message each time. - In later chapters, we’ll expand the
CONNECT
message to include more information.
Server-Side Logic
Because the client will send two messages, the server needs to maintain a state machine to handle different types of messages.
struct ReverseRequestState {
fi_addr_t client_addr = FI_ADDR_UNSPEC;
bool done = false;
// ...
};
When receiving a CONNECT
message, the client will add the client’s address to its address vector and save its corresponding fi_addr_t
in the state machine.
struct ReverseRequestState {
// ...
void HandleConnect(Network &net, RdmaOp &op) {
auto *base_msg = (const AppMessageBase *)op.recv.buf->data;
CHECK(base_msg->type == AppMessageType::kConnect);
CHECK(op.recv.recv_size == sizeof(AppConnectMessage));
auto *msg = (const AppConnectMessage *)base_msg;
printf("Received CONNECT message from client: %s\n",
msg->client_addr.ToString().c_str());
client_addr = net.AddPeerAddress(msg->client_addr);
}
};
When receiving a DATA
message, the server will reverse the message and send it back to the client. It will also set the state machine’s done
to true
.
struct ReverseRequestState {
// ...
void HandleData(Network &net, RdmaOp &op) {
auto *base_msg = (const AppMessageBase *)op.recv.buf->data;
CHECK(base_msg->type == AppMessageType::kData);
auto *msg = (uint8_t *)op.recv.buf->data + sizeof(*base_msg);
auto len = op.recv.recv_size - sizeof(*base_msg);
printf("Received message (len=%zu): %.*s\n", len, (int)len, msg);
for (size_t i = 0, j = len - 1; i < j; ++i, --j) {
auto t = msg[i];
msg[i] = msg[j];
msg[j] = t;
}
net.PostSend(client_addr, *op.recv.buf, op.recv.recv_size,
[this](Network &net, RdmaOp &op) {
printf("Sent reversed message to client\n");
done = true;
});
}
};
Although we only process one client’s request at a time, we still need to use two buffers and submit two RECV
operations in advance. This is because if we wait until receiving the CONNECT
before submitting a RECV
, the client’s data might have been sent before submitting the second RECV
, which would cause data loss due to the server not having a waiting RECV
operation.
Since these two RECV
operations are submitted simultaneously, we can’t specify different callback functions to distinguish the state machine’s state. So we need to use the same callback function for both, and then decide the next steps based on the state machine’s state in the callback function.
struct ReverseRequestState {
// ...
void OnRecv(Network &net, RdmaOp &op) {
if (client_addr == FI_ADDR_UNSPEC) {
HandleConnect(net, op);
} else {
HandleData(net, op);
}
}
};
Finally, here’s the complete server-side logic:
int ServerMain(int argc, char **argv) {
struct fi_info *info = GetInfo();
auto net = Network::Open(info);
// Allocate and register memory
auto buf1 = Buffer::Alloc(kMessageBufferSize, kBufAlign);
net.RegisterMemory(buf1);
auto buf2 = Buffer::Alloc(kMessageBufferSize, kBufAlign);
net.RegisterMemory(buf2);
// Loop forever. Accept one client at a time.
for (;;) {
// State machine
ReverseRequestState s;
// RECV for CONNECT and DATA
net.PostRecv(buf1, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
net.PostRecv(buf2, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
// Wait for completion
while (!s.done) {
net.PollCompletion();
}
}
return 0;
}
Client-Side Logic
The client’s logic is also straightforward. First, we open the network interface and register two buffers.
int ClientMain(int argc, char **argv) {
CHECK(argc == 2 || argc == 3);
auto server_addrname = EfaAddress::Parse(argv[1]);
std::string message = argc == 3 ? argv[2] : "Hello, world!";
struct fi_info *info = GetInfo();
auto net = Network::Open(info);
auto server_addr = net.AddPeerAddress(server_addrname);
auto buf1 = Buffer::Alloc(kMessageBufferSize, kBufAlign);
net.RegisterMemory(buf1);
auto buf2 = Buffer::Alloc(kMessageBufferSize, kBufAlign);
net.RegisterMemory(buf2);
// ...
}
Next, send a CONNECT
message to the server.
// Send address to server
auto *connect_msg = (AppConnectMessage *)buf1.data;
connect_msg->base.type = AppMessageType::kConnect;
connect_msg->client_addr = net.addr;
bool connect_sent = false;
net.PostSend(server_addr, buf1, sizeof(*connect_msg),
[&connect_sent](Network &net, RdmaOp &op) {
printf("Sent CONNECT message to server\n");
connect_sent = true;
});
while (!connect_sent) {
net.PollCompletion();
}
Then send a DATA
message to the server. To ensure receiving the server’s reply, we need to submit a RECV
operation before submitting the SEND
operation.
// Prepare to receive reversed message from server
bool msg_received = false;
net.PostRecv(buf2, [&msg_received](Network &net, RdmaOp &op) {
auto *msg = (const char *)op.recv.buf->data;
auto len = op.recv.recv_size;
printf("Received message (len=%zu): %.*s\n", len, (int)len, msg);
msg_received = true;
});
// Send message to server
auto *data_msg = (AppDataMessage *)buf1.data;
data_msg->base.type = AppMessageType::kData;
memcpy((char *)buf1.data + sizeof(*data_msg), message.c_str(),
message.size());
net.PostSend(
server_addr, buf1, sizeof(*data_msg) + message.size(),
[](Network &net, RdmaOp &op) { printf("Sent message to server\n"); });
Finally, wait for the server’s reply.
// Wait for message from server
while (!msg_received) {
net.PollCompletion();
}
return 0;
Results
Chapter code: https://github.com/abcdabcd987/libfabric-efa-demo/blob/master/src/5_reverse.cpp