After the groundwork in previous chapters, we can finally start writing code. This chapter’s goal is to implement unidirectional RECV and SEND between two machines using a single network card.

Although libfabric is written in C, since I’m not familiar with C, I’ll be using C++ which I’m more comfortable with. Also, since this series is a tutorial, I won’t deliberately encapsulate C++ classes but will keep things simple. Additionally, I’ll omit handling many operation error return values, only adding some assertions for checking.

#define CHECK(stmt)                                                            \
  do {                                                                         \
    if (!(stmt)) {                                                             \
      fprintf(stderr, "%s:%d %s\n", __FILE__, __LINE__, #stmt);                \
      std::exit(1);                                                            \
    }                                                                          \
  } while (0)

#define FI_CHECK(stmt)                                                         \
  do {                                                                         \
    int rc = (stmt);                                                           \
    if (rc) {                                                                  \
      fprintf(stderr, "%s:%d %s failed with %d (%s)\n", __FILE__, __LINE__,    \
              #stmt, rc, fi_strerror(-rc));                                    \
      std::exit(1);                                                            \
    }                                                                          \
  } while (0)

Getting Network Interface Information

In the previous chapter, we learned that libfabric supports many devices, and EFA supports two different protocols (RDM and DGRAM). In this series, we’ll focus on using EFA with the RDM protocol. So first, we need to use fi_getinfo() to filter out the libfabric network interfaces that meet our requirements.

#include <rdma/fabric.h>
#include <rdma/fi_cm.h>
#include <rdma/fi_domain.h>
#include <rdma/fi_endpoint.h>
#include <rdma/fi_errno.h>
#include <rdma/fi_rma.h>

struct fi_info *GetInfo() {
  struct fi_info *hints, *info;
  hints = fi_allocinfo();
  hints->ep_attr->type = FI_EP_RDM;
  hints->fabric_attr->prov_name = strdup("efa");
  FI_CHECK(fi_getinfo(FI_VERSION(2, 0), nullptr, nullptr, 0, hints, &info));
  fi_freeinfo(hints);
  return info;
}

In this code, we specify in hints that the endpoint type is RDM and the Provider name is efa. fi_getinfo() will return all matching network interfaces as a linked list.

Next, let’s traverse this linked list and output basic information about all network interfaces as a basic correctness check.

int main() {
  struct fi_info *info = GetInfo();
  for (auto *fi = info; fi; fi = fi->next) {
    printf("domain: %14s", fi->domain_attr->name);
    printf(", nic: %10s", fi->nic->device_attr->name);
    printf(", fabric: %s", fi->fabric_attr->prov_name);
    printf(", link: %.0fGbps", fi->nic->link_attr->speed / 1e9);
    printf("\n");
  }
  return 0;
}

Let’s save the program as src/4_hello.cpp. In the previous chapter, we installed libfabric in the build/libfabric/ folder. We can compile using the following command:

g++ -Wall -Werror -std=c++17 -O2 -g \
  -I./build/libfabric/include \
  -o build/4_hello src/4_hello.cpp \
  -L./build/libfabric/lib -lfabric

Running the compiled executable gives us the following output, proving that our basic hardware and software setup is correct:

domain:  rdmap79s0-rdm, nic:  rdmap79s0, fabric: efa, link: 100Gbps
domain:  rdmap80s0-rdm, nic:  rdmap80s0, fabric: efa, link: 100Gbps
domain:  rdmap81s0-rdm, nic:  rdmap81s0, fabric: efa, link: 100Gbps
[... output truncated for brevity ...]
domain: rdmap200s0-rdm, nic: rdmap200s0, fabric: efa, link: 100Gbps
domain: rdmap201s0-rdm, nic: rdmap201s0, fabric: efa, link: 100Gbps

Opening the Network Interface

libfabric Software Object Model

In the previous chapter, we introduced libfabric’s software object model, which has a tree structure. However, since we only plan to use one endpoint here, for simplicity, we can put all objects in a single struct:

struct Network {
  struct fi_info *fi;
  struct fid_fabric *fabric;
  struct fid_domain *domain;
  struct fid_cq *cq;
  struct fid_av *av;
  struct fid_ep *ep;
  // ... other fields

  static Network Open(struct fi_info *fi);
  // ... other methods
};

Let’s implement the Open() method to open the network interface specified by fi:

Network Network::Open(struct fi_info *fi) {
  struct fid_fabric *fabric;
  FI_CHECK(fi_fabric(fi->fabric_attr, &fabric, nullptr));

  struct fid_domain *domain;
  FI_CHECK(fi_domain(fabric, fi, &domain, nullptr));

  struct fid_cq *cq;
  struct fi_cq_attr cq_attr = {};
  cq_attr.format = FI_CQ_FORMAT_DATA;
  FI_CHECK(fi_cq_open(domain, &cq_attr, &cq, nullptr));

  struct fid_av *av;
  struct fi_av_attr av_attr = {};
  FI_CHECK(fi_av_open(domain, &av_attr, &av, nullptr));

  struct fid_ep *ep;
  FI_CHECK(fi_endpoint(domain, fi, &ep, nullptr));
  FI_CHECK(fi_ep_bind(ep, &cq->fid, FI_SEND | FI_RECV));
  FI_CHECK(fi_ep_bind(ep, &av->fid, 0));

  FI_CHECK(fi_enable(ep));
  //...
}

In this code, we first create the fabric, then create the domain based on fi.

Next, we create the completion queue cq and address vector av. For the completion queue, we specify the FI_CQ_FORMAT_DATA format. Later when we poll the completion queue, we’ll get a fi_cq_data_entry struct:

struct fi_cq_data_entry {
  void     *op_context;
  uint64_t flags;
  size_t   len;
  void     *buf;
  uint64_t data;
};
  • op_context is the context we pass when submitting an operation, used to distinguish different operations. We’ll use this later.
  • flags is a bitmask indicating the type of completion event. We’ll use this later.
  • buf and len represent the data buffer and length for this completion event. This is useful for RECV operations.
  • data is the immediate data carried by WRITE_IMM operations. We won’t use this in this chapter but will use it in later chapters.

Other completion queue formats can be found in fi_cq(3).

Then we create the endpoint ep, bind the completion queue and address vector to it. Finally, we enable the endpoint.

EFA Endpoint Address

Let’s also save the endpoint’s address in Network::Open(). According to the EFA RDM protocol documentation, the EFA endpoint address is 32 bytes long. We can use fi_getname() to get the endpoint address:

Network Network::Open(struct fi_info *fi) {
  // ...
  uint8_t addr[64];
  size_t addrlen = sizeof(addr);
  FI_CHECK(fi_getname(&ep->fid, addr, &addrlen));
  FI_CHECK(addrlen == 32);
  // ...
}

This address is in binary format. To make it easier for input and output, we can convert this address to a hexadecimal string:

struct EfaAddress {
  uint8_t bytes[32];

  explicit EfaAddress(uint8_t bytes[32]) { memcpy(this->bytes, bytes, 32); }

  std::string ToString() const {
    char buf[65];
    for (size_t i = 0; i < 32; i++) {
      snprintf(buf + 2 * i, 3, "%02x", bytes[i]);
    }
    return std::string(buf, 64);
  }

  static EfaAddress Parse(const std::string &str) {
    CHECK(str.size() == 64);
    uint8_t bytes[32];
    for (size_t i = 0; i < 32; i++) {
      sscanf(str.c_str() + 2 * i, "%02hhx", &bytes[i]);
    }
    return EfaAddress(bytes);
  }
};

Save this address in the Network struct:

struct Network {
  // ...
  EfaAddress addr;
  // ...
};

Network Network::Open(struct fi_info *fi) {
  // ...
  return Network{fi, fabric, domain, cq, av, ep, EfaAddress(addr)};
}

Adding Target Address

In previous chapters, we mentioned that to reduce address resolution overhead, libfabric needs to add the target address to the address vector before initiating network operations. The fi_av_insert() function can be used to add addresses and will return a fi_addr_t type address. Let’s add a method to add target addresses:

fi_addr_t Network::AddPeerAddress(const EfaAddress &peer_addr) {
  fi_addr_t addr = FI_ADDR_UNSPEC;
  int ret = fi_av_insert(av, peer_addr.bytes, 1, &addr, 0, nullptr);
  if (ret != 1) {
    fprintf(stderr, "fi_av_insert failed: %d\n", ret);
    std::exit(1);
  }
  return addr;
}

Buffers

In libfabric, data transmission buffers are provided by the user. We’ll use a simple struct to store basic buffer information. Besides pointer and length, one thing to note is memory alignment. In the aws-ofi-nccl code, I found they use 128-byte alignment. Although I haven’t found any alignment requirements in libfabric or EFA documentation, to be safe, we’ll also use 128-byte alignment.

constexpr size_t kBufAlign = 128; // EFA alignment requirement

void *align_up(void *ptr, size_t align) {
  uintptr_t addr = (uintptr_t)ptr;
  return (void *)((addr + align - 1) & ~(align - 1));
}

struct Buffer {
  void *data;
  size_t size;

  static Buffer Alloc(size_t size, size_t align) {
    void *raw_data = malloc(size);
    CHECK(raw_data != nullptr);
    return Buffer(raw_data, size, align);
  }

  Buffer(Buffer &&other)
      : data(other.data), size(other.size), raw_data(other.raw_data) {
    other.data = nullptr;
    other.raw_data = nullptr;
  }

  ~Buffer() { free(raw_data); }

private:
  void *raw_data;

  Buffer(void *raw_data, size_t raw_size, size_t align) {
    this->raw_data = raw_data;
    this->data = align_up(raw_data, align);
    this->size = (size_t)((uintptr_t)raw_data + raw_size - (uintptr_t)data);
  }
  Buffer(const Buffer &) = delete;
};

Registering Memory Regions

As mentioned in previous chapters, to allow network cards to access buffers in user virtual address space, we need to use the fi_mr_regattr() method to register these memory regions.

struct Network {
  // ...
  std::unordered_map<void *, struct fid_mr *> mr;
  void RegisterMemory(Buffer &buf);
  struct fid_mr *GetMR(const Buffer &buf);
};

void Network::RegisterMemory(Buffer &buf) {
  struct fid_mr *mr;
  struct fi_mr_attr mr_attr = {};
  struct iovec iov = {.iov_base = buf.data, .iov_len = buf.size};
  mr_attr.mr_iov = &iov;
  mr_attr.iov_count = 1;
  mr_attr.access = FI_SEND | FI_RECV;
  uint64_t flags = 0;
  FI_CHECK(fi_mr_regattr(domain, &mr_attr, flags, &mr));
  this->mr[buf.data] = mr;
}

struct fid_mr *Network::GetMR(const Buffer &buf) {
  auto it = mr.find(buf.data);
  CHECK(it != mr.end());
  return it->second;
}

Submitting Operations

Next, let’s look at how to submit an RDMA network operation and how to maintain operation context.

Operation Context

When a network operation completes, we need to trigger subsequent application logic. To distinguish different operations, we use a RdmaOp class to store the operation’s type, data, and callback function. We pass this RdmaOp as context when submitting operations, and when the operation completes, we can use this context to call the callback function.

enum class RdmaOpType : uint8_t {
  kRecv = 0,
  kSend = 1,
};

struct RdmaRecvOp {
  Buffer *buf;
  fi_addr_t src_addr; // Set after completion
  size_t recv_size;   // Set after completion
};
static_assert(std::is_pod_v<RdmaRecvOp>);

struct RdmaSendOp {
  Buffer *buf;
  size_t len;
  fi_addr_t dest_addr;
};
static_assert(std::is_pod_v<RdmaSendOp>);

struct RdmaOp {
  RdmaOpType type;
  union {
    RdmaRecvOp recv;
    RdmaSendOp send;
  };
  std::function<void(Network &, RdmaOp &)> callback;
};

RECV Operation

To receive data, we need to submit a RECV operation. In libfabric, we use the fi_recvmsg() function to submit a RECV operation.

void Network::PostRecv(Buffer &buf,
                       std::function<void(Network &, RdmaOp &)> &&callback) {
  auto *op = new RdmaOp{
      .type = RdmaOpType::kRecv,
      .recv =
          RdmaRecvOp{.buf = &buf, .src_addr = FI_ADDR_UNSPEC, .recv_size = 0},
      .callback = std::move(callback),
  };
  struct iovec iov = {
      .iov_base = buf.data,
      .iov_len = buf.size,
  };
  struct fi_msg msg = {
      .msg_iov = &iov,
      .desc = &GetMR(buf)->mem_desc,
      .iov_count = 1,
      .addr = FI_ADDR_UNSPEC,
      .context = op,
  };
  FI_CHECK(fi_recvmsg(ep, &msg, 0)); // TODO: handle EAGAIN
}

In the code above, we first allocate a RdmaOp object. Here we directly use new to allocate memory. Later, we’ll free this memory when the operation completes.

When calling fi_recvmsg(), we pass in the buffer and its memory region descriptor, along with the RdmaOp object as context. We haven’t specified a source address, so this RECV operation can receive data from any address.

Notably, we haven’t handled the return value of fi_recvmsg(). When network operations are very frequent, fi_recvmsg() might return FI_EAGAIN, indicating that the current network interface has no space to receive new data. In this case, we need to wait for a while before submitting the RECV operation again. Since in this chapter we’re only receiving and sending one message, we probably won’t encounter this situation. Therefore, we won’t handle this case in this chapter, but we will handle it in later chapters.

SEND Operation

The SEND operation is similar to the RECV operation, but we need to specify the target address.

void Network::PostSend(fi_addr_t addr, Buffer &buf, size_t len,
                       std::function<void(Network &, RdmaOp &)> &&callback) {
  CHECK(len <= buf.size);
  auto *op = new RdmaOp{
      .type = RdmaOpType::kSend,
      .send = RdmaSendOp{.buf = &buf, .len = len, .dest_addr = addr},
      .callback = std::move(callback),
  };
  struct iovec iov = {
      .iov_base = buf.data,
      .iov_len = len,
  };
  struct fi_msg msg = {
      .msg_iov = &iov,
      .desc = &GetMR(buf)->mem_desc,
      .iov_count = 1,
      .addr = addr,
      .context = op,
  };
  FI_CHECK(fi_sendmsg(ep, &msg, 0)); // TODO: handle EAGAIN
}

Polling the Completion Queue

We need to poll the completion queue to get operation completion events. We can use the fi_cq_read() function to read the completion queue. We need to provide an array to store completion events and its length. Since we previously specified the completion queue format as FI_CQ_FORMAT_DATA, we can allocate a fi_cq_data_entry array on the stack to store completion events. If the number of completion events exceeds the array length, we’ll need to call fi_cq_read() multiple times to read all events.

constexpr size_t kCompletionQueueReadCount = 16;

void Network::PollCompletion() {
  struct fi_cq_data_entry cqe[kCompletionQueueReadCount];
  for (;;) {
    auto ret = fi_cq_read(cq, cqe, kCompletionQueueReadCount);
    // ...
  }
}

fi_cq_read() can return several different values:

  • If return value is positive, it indicates successful reading of completion events. We can iterate through the cqe array to handle each completion event.
  • If return value is -FI_EAGAIN, it indicates there are no more completion events. We can exit the loop.
  • If return value is -FI_EAVAIL, it indicates there are error events in the completion queue. We can use the fi_cq_readerr() function to read error events.
    • If fi_cq_readerr() returns 1, it means it successfully read an error event. We can output the error message.
    • If fi_cq_readerr() returns any other value, it indicates another error occurred. We can exit the program.
    if (ret > 0) {
      for (ssize_t i = 0; i < ret; i++) {
        HandleCompletion(*this, cqe[i]);
      }
    } else if (ret == -FI_EAGAIN) {
      break;  // No more completions
    } else if (ret == -FI_EAVAIL) {
      struct fi_cq_err_entry err_entry;
      ret = fi_cq_readerr(cq, &err_entry, 0);
      if (ret > 0) {
        fprintf(stderr, "Failed libfabric operation: %s\n",
                fi_cq_strerror(cq, err_entry.prov_errno, err_entry.err_data,
                               nullptr, 0));
      } else {
        fprintf(stderr, "fi_cq_readerr error: %zd (%s)\n", ret,
                fi_strerror(-ret));
        std::exit(1);
      }
    } else {
      fprintf(stderr, "fi_cq_read error: %zd (%s)\n", ret, fi_strerror(-ret));
      std::exit(1);
    }

When handling completion events, we first read op_context. If op_context == nullptr, it means we didn’t provide a context when submitting the operation, in which case we ignore this completion event.

Otherwise, we can determine the type of completion event based on flags. If it’s FI_RECV, we can read len and buf and call the callback function. If it’s FI_SEND, we can call the callback function.

Finally, we need to free the RdmaOp object.

void HandleCompletion(Network &net, const struct fi_cq_data_entry &cqe) {
  auto comp_flags = cqe.flags;
  auto op = (RdmaOp *)cqe.op_context;
  if (!op) {
    return;
  }
  if (comp_flags & FI_RECV) {
    op->recv.recv_size = cqe.len;
    if (op->callback)
      op->callback(net, *op);
  } else if (comp_flags & FI_SEND) {
    if (op->callback)
      op->callback(net, *op);
  } else {
    fprintf(stderr, "Unhandled completion type. comp_flags=%lx\n", comp_flags);
    std::exit(1);
  }
  delete op;
}

Since in this chapter we only have RECV and SEND operations, we only handle these two types of completion events. In later chapters, we’ll handle more operation types.

Server Logic

Having completed all the preparation work above, we can now write the server logic.

On the server side, we first open the network interface, then register a buffer, then submit a RECV operation. When the RECV operation completes, we can read the data and submit another RECV operation.

How do we let the client know the server’s address? Many other example programs either choose to start another TCP socket to exchange addresses or fork() a child process to run the client. Here we choose a simpler method: print out the address on the server side and let users input it manually.

constexpr size_t kMessageBufferSize = 8192;

int ServerMain(int argc, char **argv) {
  struct fi_info *info = GetInfo();
  auto net = Network::Open(info);
  printf("domain: %14s", info->domain_attr->name);
  printf(", nic: %10s", info->nic->device_attr->name);
  printf(", fabric: %s", info->fabric_attr->prov_name);
  printf(", link: %.0fGbps", info->nic->link_attr->speed / 1e9);
  printf("\n");
  printf("Run client with the following command:\n");
  printf("  %s %s\n", argv[0], net.addr.ToString().c_str());
  printf("  %s %s \"anytext\"\n", argv[0], net.addr.ToString().c_str());
  printf("------\n");

  auto buf_msg = Buffer::Alloc(kMessageBufferSize, kBufAlign);
  net.RegisterMemory(buf_msg);
  net.PostRecv(buf_msg, [](Network &net, RdmaOp &op) {
    auto *msg = (const char *)op.recv.buf->data;
    auto len = op.recv.recv_size;
    printf("Received message (len=%zu): %.*s\n", len, (int)len, msg);
    net.PostRecv(*op.recv.buf, std::move(op.callback));
  });

  for (;;) {
    net.PollCompletion();
  }

  return 0;
}

Client Logic

The client logic is similar to the server logic. We first open the network interface, then add the server’s address to the address vector, register a buffer, and submit a SEND operation. When the SEND operation completes, the client’s logic ends.

int ClientMain(int argc, char **argv) {
  CHECK(argc == 2 || argc == 3);
  auto server_addrname = EfaAddress::Parse(argv[1]);
  std::string message = argc == 3 ? argv[2] : "Hello, world!";

  struct fi_info *info = GetInfo();
  auto net = Network::Open(info);
  printf("domain: %14s", info->domain_attr->name);
  printf(", nic: %10s", info->nic->device_attr->name);
  printf(", fabric: %s", info->fabric_attr->prov_name);
  printf(", link: %.0fGbps", info->nic->link_attr->speed / 1e9);
  printf("\n");
  auto server_addr = net.AddPeerAddress(server_addrname);
  auto buf_msg = Buffer::Alloc(kMessageBufferSize, kBufAlign);
  net.RegisterMemory(buf_msg);
  memcpy(buf_msg.data, message.data(), message.size());

  bool sent = false;
  net.PostSend(server_addr, buf_msg, message.size(),
               [&sent](Network &net, RdmaOp &op) {
                 auto *msg = (const char *)op.send.buf->data;
                 auto len = op.send.len;
                 printf("Sent message (len=%zu): %.*s\n", len, (int)len, msg);
                 sent = true;
               });
  while (!sent) {
    net.PollCompletion();
  }

  return 0;
}

Results

4_hello

Chapter code: https://github.com/abcdabcd987/libfabric-efa-demo/blob/master/src/4_hello.cpp