In the previous chapter, we implemented unidirectional receive and send. In this chapter, we will extend the previous program to implement bidirectional receive and send. After the server receives a message from the client, it will reverse the message and send it back to the client. We’ll name this program 5_reverse.cpp.

Different Types of Messages

To allow the server to send a message to the client, the server must first know the client’s address. Therefore, we’ll first have the client send its address to the server, then send a message to the server, and finally receive a message back from the server. To distinguish different types of messages, we’ll add a type field to the message header.

enum class AppMessageType : uint8_t {
  kConnect = 0,
  kData = 1,
};

struct AppMessageBase {
  AppMessageType type;
};

struct AppConnectMessage {
  AppMessageBase base;
  EfaAddress client_addr;
};

struct AppDataMessage {
  AppMessageBase base;
  // Data follows
};

Note that the AppDataMessage structure doesn’t define a length because the buffer length will be specified during sending, and the buffer length will be obtained during receiving. Subtracting the header length gives the data length.

Why not merge the CONNECT and DATA messages into one?

  1. The server doesn’t need to send a CONNECT message to the client, so there’s no need to include CONNECT content in the DATA message.
  2. Separating these makes the communication logic clearer.
  3. The client can send multiple DATA messages without sending a CONNECT message each time.
  4. In later chapters, we’ll expand the CONNECT message to include more information.

Server-Side Logic

Because the client will send two messages, the server needs to maintain a state machine to handle different types of messages.

struct ReverseRequestState {
  fi_addr_t client_addr = FI_ADDR_UNSPEC;
  bool done = false;

  // ...
};

When receiving a CONNECT message, the client will add the client’s address to its address vector and save its corresponding fi_addr_t in the state machine.

struct ReverseRequestState {
  // ...

  void HandleConnect(Network &net, RdmaOp &op) {
    auto *base_msg = (const AppMessageBase *)op.recv.buf->data;
    CHECK(base_msg->type == AppMessageType::kConnect);
    CHECK(op.recv.recv_size == sizeof(AppConnectMessage));
    auto *msg = (const AppConnectMessage *)base_msg;
    printf("Received CONNECT message from client: %s\n",
           msg->client_addr.ToString().c_str());
    client_addr = net.AddPeerAddress(msg->client_addr);
  }
};

When receiving a DATA message, the server will reverse the message and send it back to the client. It will also set the state machine’s done to true.

struct ReverseRequestState {
  // ...

  void HandleData(Network &net, RdmaOp &op) {
    auto *base_msg = (const AppMessageBase *)op.recv.buf->data;
    CHECK(base_msg->type == AppMessageType::kData);
    auto *msg = (uint8_t *)op.recv.buf->data + sizeof(*base_msg);
    auto len = op.recv.recv_size - sizeof(*base_msg);
    printf("Received message (len=%zu): %.*s\n", len, (int)len, msg);
    for (size_t i = 0, j = len - 1; i < j; ++i, --j) {
      auto t = msg[i];
      msg[i] = msg[j];
      msg[j] = t;
    }
    net.PostSend(client_addr, *op.recv.buf, op.recv.recv_size,
                 [this](Network &net, RdmaOp &op) {
                   printf("Sent reversed message to client\n");
                   done = true;
                 });
  }
};

Although we only process one client’s request at a time, we still need to use two buffers and submit two RECV operations in advance. This is because if we wait until receiving the CONNECT before submitting a RECV, the client’s data might have been sent before submitting the second RECV, which would cause data loss due to the server not having a waiting RECV operation.

Since these two RECV operations are submitted simultaneously, we can’t specify different callback functions to distinguish the state machine’s state. So we need to use the same callback function for both, and then decide the next steps based on the state machine’s state in the callback function.

struct ReverseRequestState {
  // ...

  void OnRecv(Network &net, RdmaOp &op) {
    if (client_addr == FI_ADDR_UNSPEC) {
      HandleConnect(net, op);
    } else {
      HandleData(net, op);
    }
  }
};

Finally, here’s the complete server-side logic:

int ServerMain(int argc, char **argv) {
  struct fi_info *info = GetInfo();
  auto net = Network::Open(info);

  // Allocate and register memory
  auto buf1 = Buffer::Alloc(kMessageBufferSize, kBufAlign);
  net.RegisterMemory(buf1);
  auto buf2 = Buffer::Alloc(kMessageBufferSize, kBufAlign);
  net.RegisterMemory(buf2);

  // Loop forever. Accept one client at a time.
  for (;;) {
    // State machine
    ReverseRequestState s;
    // RECV for CONNECT and DATA
    net.PostRecv(buf1, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
    net.PostRecv(buf2, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
    // Wait for completion
    while (!s.done) {
      net.PollCompletion();
    }
  }

  return 0;
}

Client-Side Logic

The client’s logic is also straightforward. First, we open the network interface and register two buffers.

int ClientMain(int argc, char **argv) {
  CHECK(argc == 2 || argc == 3);
  auto server_addrname = EfaAddress::Parse(argv[1]);
  std::string message = argc == 3 ? argv[2] : "Hello, world!";

  struct fi_info *info = GetInfo();
  auto net = Network::Open(info);
  auto server_addr = net.AddPeerAddress(server_addrname);

  auto buf1 = Buffer::Alloc(kMessageBufferSize, kBufAlign);
  net.RegisterMemory(buf1);
  auto buf2 = Buffer::Alloc(kMessageBufferSize, kBufAlign);
  net.RegisterMemory(buf2);

  // ...
}

Next, send a CONNECT message to the server.

  // Send address to server
  auto *connect_msg = (AppConnectMessage *)buf1.data;
  connect_msg->base.type = AppMessageType::kConnect;
  connect_msg->client_addr = net.addr;
  bool connect_sent = false;
  net.PostSend(server_addr, buf1, sizeof(*connect_msg),
               [&connect_sent](Network &net, RdmaOp &op) {
                 printf("Sent CONNECT message to server\n");
                 connect_sent = true;
               });
  while (!connect_sent) {
    net.PollCompletion();
  }

Then send a DATA message to the server. To ensure receiving the server’s reply, we need to submit a RECV operation before submitting the SEND operation.

  // Prepare to receive reversed message from server
  bool msg_received = false;
  net.PostRecv(buf2, [&msg_received](Network &net, RdmaOp &op) {
    auto *msg = (const char *)op.recv.buf->data;
    auto len = op.recv.recv_size;
    printf("Received message (len=%zu): %.*s\n", len, (int)len, msg);
    msg_received = true;
  });

  // Send message to server
  auto *data_msg = (AppDataMessage *)buf1.data;
  data_msg->base.type = AppMessageType::kData;
  memcpy((char *)buf1.data + sizeof(*data_msg), message.c_str(),
         message.size());
  net.PostSend(
      server_addr, buf1, sizeof(*data_msg) + message.size(),
      [](Network &net, RdmaOp &op) { printf("Sent message to server\n"); });

Finally, wait for the server’s reply.

  // Wait for message from server
  while (!msg_received) {
    net.PollCompletion();
  }

  return 0;

Results

5_reverse

Chapter code: https://github.com/abcdabcd987/libfabric-efa-demo/blob/master/src/5_reverse.cpp