Harnessing 3200Gbps Network (4): Unidirectional SEND and RECV
After the groundwork in previous chapters, we can finally start writing code. This chapter’s goal is to implement unidirectional RECV
and SEND
between two machines using a single network card.
Although libfabric
is written in C, since I’m not familiar with C, I’ll be using C++ which I’m more comfortable with. Also, since this series is a tutorial, I won’t deliberately encapsulate C++ classes but will keep things simple. Additionally, I’ll omit handling many operation error return values, only adding some assertions for checking.
#define CHECK(stmt) \
do { \
if (!(stmt)) { \
fprintf(stderr, "%s:%d %s\n", __FILE__, __LINE__, #stmt); \
std::exit(1); \
} \
} while (0)
#define FI_CHECK(stmt) \
do { \
int rc = (stmt); \
if (rc) { \
fprintf(stderr, "%s:%d %s failed with %d (%s)\n", __FILE__, __LINE__, \
#stmt, rc, fi_strerror(-rc)); \
std::exit(1); \
} \
} while (0)
Getting Network Interface Information
In the previous chapter, we learned that libfabric
supports many devices, and EFA supports two different protocols (RDM
and DGRAM
). In this series, we’ll focus on using EFA with the RDM
protocol. So first, we need to use fi_getinfo()
to filter out the libfabric
network interfaces that meet our requirements.
#include <rdma/fabric.h>
#include <rdma/fi_cm.h>
#include <rdma/fi_domain.h>
#include <rdma/fi_endpoint.h>
#include <rdma/fi_errno.h>
#include <rdma/fi_rma.h>
struct fi_info *GetInfo() {
struct fi_info *hints, *info;
hints = fi_allocinfo();
hints->ep_attr->type = FI_EP_RDM;
hints->fabric_attr->prov_name = strdup("efa");
FI_CHECK(fi_getinfo(FI_VERSION(2, 0), nullptr, nullptr, 0, hints, &info));
fi_freeinfo(hints);
return info;
}
In this code, we specify in hints
that the endpoint type is RDM
and the Provider name is efa
. fi_getinfo()
will return all matching network interfaces as a linked list.
Next, let’s traverse this linked list and output basic information about all network interfaces as a basic correctness check.
int main() {
struct fi_info *info = GetInfo();
for (auto *fi = info; fi; fi = fi->next) {
printf("domain: %14s", fi->domain_attr->name);
printf(", nic: %10s", fi->nic->device_attr->name);
printf(", fabric: %s", fi->fabric_attr->prov_name);
printf(", link: %.0fGbps", fi->nic->link_attr->speed / 1e9);
printf("\n");
}
return 0;
}
Let’s save the program as src/4_hello.cpp
. In the previous chapter, we installed libfabric
in the build/libfabric/
folder. We can compile using the following command:
g++ -Wall -Werror -std=c++17 -O2 -g \
-I./build/libfabric/include \
-o build/4_hello src/4_hello.cpp \
-L./build/libfabric/lib -lfabric
Running the compiled executable gives us the following output, proving that our basic hardware and software setup is correct:
domain: rdmap79s0-rdm, nic: rdmap79s0, fabric: efa, link: 100Gbps
domain: rdmap80s0-rdm, nic: rdmap80s0, fabric: efa, link: 100Gbps
domain: rdmap81s0-rdm, nic: rdmap81s0, fabric: efa, link: 100Gbps
[... output truncated for brevity ...]
domain: rdmap200s0-rdm, nic: rdmap200s0, fabric: efa, link: 100Gbps
domain: rdmap201s0-rdm, nic: rdmap201s0, fabric: efa, link: 100Gbps
Opening the Network Interface
In the previous chapter, we introduced libfabric
’s software object model, which has a tree structure. However, since we only plan to use one endpoint here, for simplicity, we can put all objects in a single struct:
struct Network {
struct fi_info *fi;
struct fid_fabric *fabric;
struct fid_domain *domain;
struct fid_cq *cq;
struct fid_av *av;
struct fid_ep *ep;
// ... other fields
static Network Open(struct fi_info *fi);
// ... other methods
};
Let’s implement the Open()
method to open the network interface specified by fi
:
Network Network::Open(struct fi_info *fi) {
struct fid_fabric *fabric;
FI_CHECK(fi_fabric(fi->fabric_attr, &fabric, nullptr));
struct fid_domain *domain;
FI_CHECK(fi_domain(fabric, fi, &domain, nullptr));
struct fid_cq *cq;
struct fi_cq_attr cq_attr = {};
cq_attr.format = FI_CQ_FORMAT_DATA;
FI_CHECK(fi_cq_open(domain, &cq_attr, &cq, nullptr));
struct fid_av *av;
struct fi_av_attr av_attr = {};
FI_CHECK(fi_av_open(domain, &av_attr, &av, nullptr));
struct fid_ep *ep;
FI_CHECK(fi_endpoint(domain, fi, &ep, nullptr));
FI_CHECK(fi_ep_bind(ep, &cq->fid, FI_SEND | FI_RECV));
FI_CHECK(fi_ep_bind(ep, &av->fid, 0));
FI_CHECK(fi_enable(ep));
//...
}
In this code, we first create the fabric
, then create the domain
based on fi
.
Next, we create the completion queue cq
and address vector av
. For the completion queue, we specify the FI_CQ_FORMAT_DATA
format. Later when we poll the completion queue, we’ll get a fi_cq_data_entry
struct:
struct fi_cq_data_entry {
void *op_context;
uint64_t flags;
size_t len;
void *buf;
uint64_t data;
};
op_context
is the context we pass when submitting an operation, used to distinguish different operations. We’ll use this later.flags
is a bitmask indicating the type of completion event. We’ll use this later.buf
andlen
represent the data buffer and length for this completion event. This is useful forRECV
operations.data
is the immediate data carried byWRITE_IMM
operations. We won’t use this in this chapter but will use it in later chapters.
Other completion queue formats can be found in fi_cq(3)
.
Then we create the endpoint ep
, bind the completion queue and address vector to it. Finally, we enable the endpoint.
EFA Endpoint Address
Let’s also save the endpoint’s address in Network::Open()
. According to the EFA RDM protocol documentation, the EFA endpoint address is 32 bytes long. We can use fi_getname()
to get the endpoint address:
Network Network::Open(struct fi_info *fi) {
// ...
uint8_t addr[64];
size_t addrlen = sizeof(addr);
FI_CHECK(fi_getname(&ep->fid, addr, &addrlen));
FI_CHECK(addrlen == 32);
// ...
}
This address is in binary format. To make it easier for input and output, we can convert this address to a hexadecimal string:
struct EfaAddress {
uint8_t bytes[32];
explicit EfaAddress(uint8_t bytes[32]) { memcpy(this->bytes, bytes, 32); }
std::string ToString() const {
char buf[65];
for (size_t i = 0; i < 32; i++) {
snprintf(buf + 2 * i, 3, "%02x", bytes[i]);
}
return std::string(buf, 64);
}
static EfaAddress Parse(const std::string &str) {
CHECK(str.size() == 64);
uint8_t bytes[32];
for (size_t i = 0; i < 32; i++) {
sscanf(str.c_str() + 2 * i, "%02hhx", &bytes[i]);
}
return EfaAddress(bytes);
}
};
Save this address in the Network
struct:
struct Network {
// ...
EfaAddress addr;
// ...
};
Network Network::Open(struct fi_info *fi) {
// ...
return Network{fi, fabric, domain, cq, av, ep, EfaAddress(addr)};
}
Adding Target Address
In previous chapters, we mentioned that to reduce address resolution overhead, libfabric
needs to add the target address to the address vector before initiating network operations. The fi_av_insert()
function can be used to add addresses and will return a fi_addr_t
type address. Let’s add a method to add target addresses:
fi_addr_t Network::AddPeerAddress(const EfaAddress &peer_addr) {
fi_addr_t addr = FI_ADDR_UNSPEC;
int ret = fi_av_insert(av, peer_addr.bytes, 1, &addr, 0, nullptr);
if (ret != 1) {
fprintf(stderr, "fi_av_insert failed: %d\n", ret);
std::exit(1);
}
return addr;
}
Buffers
In libfabric
, data transmission buffers are provided by the user. We’ll use a simple struct to store basic buffer information. Besides pointer and length, one thing to note is memory alignment. In the aws-ofi-nccl
code, I found they use 128-byte alignment. Although I haven’t found any alignment requirements in libfabric
or EFA documentation, to be safe, we’ll also use 128-byte alignment.
constexpr size_t kBufAlign = 128; // EFA alignment requirement
void *align_up(void *ptr, size_t align) {
uintptr_t addr = (uintptr_t)ptr;
return (void *)((addr + align - 1) & ~(align - 1));
}
struct Buffer {
void *data;
size_t size;
static Buffer Alloc(size_t size, size_t align) {
void *raw_data = malloc(size);
CHECK(raw_data != nullptr);
return Buffer(raw_data, size, align);
}
Buffer(Buffer &&other)
: data(other.data), size(other.size), raw_data(other.raw_data) {
other.data = nullptr;
other.raw_data = nullptr;
}
~Buffer() { free(raw_data); }
private:
void *raw_data;
Buffer(void *raw_data, size_t raw_size, size_t align) {
this->raw_data = raw_data;
this->data = align_up(raw_data, align);
this->size = (size_t)((uintptr_t)raw_data + raw_size - (uintptr_t)data);
}
Buffer(const Buffer &) = delete;
};
Registering Memory Regions
As mentioned in previous chapters, to allow network cards to access buffers in user virtual address space, we need to use the fi_mr_regattr()
method to register these memory regions.
struct Network {
// ...
std::unordered_map<void *, struct fid_mr *> mr;
void RegisterMemory(Buffer &buf);
struct fid_mr *GetMR(const Buffer &buf);
};
void Network::RegisterMemory(Buffer &buf) {
struct fid_mr *mr;
struct fi_mr_attr mr_attr = {};
struct iovec iov = {.iov_base = buf.data, .iov_len = buf.size};
mr_attr.mr_iov = &iov;
mr_attr.iov_count = 1;
mr_attr.access = FI_SEND | FI_RECV;
uint64_t flags = 0;
FI_CHECK(fi_mr_regattr(domain, &mr_attr, flags, &mr));
this->mr[buf.data] = mr;
}
struct fid_mr *Network::GetMR(const Buffer &buf) {
auto it = mr.find(buf.data);
CHECK(it != mr.end());
return it->second;
}
Submitting Operations
Next, let’s look at how to submit an RDMA network operation and how to maintain operation context.
Operation Context
When a network operation completes, we need to trigger subsequent application logic. To distinguish different operations, we use a RdmaOp
class to store the operation’s type, data, and callback function. We pass this RdmaOp
as context when submitting operations, and when the operation completes, we can use this context to call the callback function.
enum class RdmaOpType : uint8_t {
kRecv = 0,
kSend = 1,
};
struct RdmaRecvOp {
Buffer *buf;
fi_addr_t src_addr; // Set after completion
size_t recv_size; // Set after completion
};
static_assert(std::is_pod_v<RdmaRecvOp>);
struct RdmaSendOp {
Buffer *buf;
size_t len;
fi_addr_t dest_addr;
};
static_assert(std::is_pod_v<RdmaSendOp>);
struct RdmaOp {
RdmaOpType type;
union {
RdmaRecvOp recv;
RdmaSendOp send;
};
std::function<void(Network &, RdmaOp &)> callback;
};
RECV
Operation
To receive data, we need to submit a RECV
operation. In libfabric
, we use the fi_recvmsg()
function to submit a RECV
operation.
void Network::PostRecv(Buffer &buf,
std::function<void(Network &, RdmaOp &)> &&callback) {
auto *op = new RdmaOp{
.type = RdmaOpType::kRecv,
.recv =
RdmaRecvOp{.buf = &buf, .src_addr = FI_ADDR_UNSPEC, .recv_size = 0},
.callback = std::move(callback),
};
struct iovec iov = {
.iov_base = buf.data,
.iov_len = buf.size,
};
struct fi_msg msg = {
.msg_iov = &iov,
.desc = &GetMR(buf)->mem_desc,
.iov_count = 1,
.addr = FI_ADDR_UNSPEC,
.context = op,
};
FI_CHECK(fi_recvmsg(ep, &msg, 0)); // TODO: handle EAGAIN
}
In the code above, we first allocate a RdmaOp
object. Here we directly use new
to allocate memory. Later, we’ll free this memory when the operation completes.
When calling fi_recvmsg()
, we pass in the buffer and its memory region descriptor, along with the RdmaOp
object as context. We haven’t specified a source address, so this RECV
operation can receive data from any address.
Notably, we haven’t handled the return value of fi_recvmsg()
. When network operations are very frequent, fi_recvmsg()
might return FI_EAGAIN
, indicating that the current network interface has no space to receive new data. In this case, we need to wait for a while before submitting the RECV
operation again. Since in this chapter we’re only receiving and sending one message, we probably won’t encounter this situation. Therefore, we won’t handle this case in this chapter, but we will handle it in later chapters.
SEND
Operation
The SEND
operation is similar to the RECV
operation, but we need to specify the target address.
void Network::PostSend(fi_addr_t addr, Buffer &buf, size_t len,
std::function<void(Network &, RdmaOp &)> &&callback) {
CHECK(len <= buf.size);
auto *op = new RdmaOp{
.type = RdmaOpType::kSend,
.send = RdmaSendOp{.buf = &buf, .len = len, .dest_addr = addr},
.callback = std::move(callback),
};
struct iovec iov = {
.iov_base = buf.data,
.iov_len = len,
};
struct fi_msg msg = {
.msg_iov = &iov,
.desc = &GetMR(buf)->mem_desc,
.iov_count = 1,
.addr = addr,
.context = op,
};
FI_CHECK(fi_sendmsg(ep, &msg, 0)); // TODO: handle EAGAIN
}
Polling the Completion Queue
We need to poll the completion queue to get operation completion events. We can use the fi_cq_read()
function to read the completion queue. We need to provide an array to store completion events and its length. Since we previously specified the completion queue format as FI_CQ_FORMAT_DATA
, we can allocate a fi_cq_data_entry
array on the stack to store completion events. If the number of completion events exceeds the array length, we’ll need to call fi_cq_read()
multiple times to read all events.
constexpr size_t kCompletionQueueReadCount = 16;
void Network::PollCompletion() {
struct fi_cq_data_entry cqe[kCompletionQueueReadCount];
for (;;) {
auto ret = fi_cq_read(cq, cqe, kCompletionQueueReadCount);
// ...
}
}
fi_cq_read()
can return several different values:
- If return value is positive, it indicates successful reading of completion events. We can iterate through the
cqe
array to handle each completion event. - If return value is
-FI_EAGAIN
, it indicates there are no more completion events. We can exit the loop. - If return value is
-FI_EAVAIL
, it indicates there are error events in the completion queue. We can use thefi_cq_readerr()
function to read error events.- If
fi_cq_readerr()
returns 1, it means it successfully read an error event. We can output the error message. - If
fi_cq_readerr()
returns any other value, it indicates another error occurred. We can exit the program.
- If
if (ret > 0) {
for (ssize_t i = 0; i < ret; i++) {
HandleCompletion(*this, cqe[i]);
}
} else if (ret == -FI_EAGAIN) {
break; // No more completions
} else if (ret == -FI_EAVAIL) {
struct fi_cq_err_entry err_entry;
ret = fi_cq_readerr(cq, &err_entry, 0);
if (ret > 0) {
fprintf(stderr, "Failed libfabric operation: %s\n",
fi_cq_strerror(cq, err_entry.prov_errno, err_entry.err_data,
nullptr, 0));
} else {
fprintf(stderr, "fi_cq_readerr error: %zd (%s)\n", ret,
fi_strerror(-ret));
std::exit(1);
}
} else {
fprintf(stderr, "fi_cq_read error: %zd (%s)\n", ret, fi_strerror(-ret));
std::exit(1);
}
When handling completion events, we first read op_context
. If op_context == nullptr
, it means we didn’t provide a context when submitting the operation, in which case we ignore this completion event.
Otherwise, we can determine the type of completion event based on flags
. If it’s FI_RECV
, we can read len
and buf
and call the callback function. If it’s FI_SEND
, we can call the callback function.
Finally, we need to free the RdmaOp
object.
void HandleCompletion(Network &net, const struct fi_cq_data_entry &cqe) {
auto comp_flags = cqe.flags;
auto op = (RdmaOp *)cqe.op_context;
if (!op) {
return;
}
if (comp_flags & FI_RECV) {
op->recv.recv_size = cqe.len;
if (op->callback)
op->callback(net, *op);
} else if (comp_flags & FI_SEND) {
if (op->callback)
op->callback(net, *op);
} else {
fprintf(stderr, "Unhandled completion type. comp_flags=%lx\n", comp_flags);
std::exit(1);
}
delete op;
}
Since in this chapter we only have RECV
and SEND
operations, we only handle these two types of completion events. In later chapters, we’ll handle more operation types.
Server Logic
Having completed all the preparation work above, we can now write the server logic.
On the server side, we first open the network interface, then register a buffer, then submit a RECV
operation. When the RECV
operation completes, we can read the data and submit another RECV
operation.
How do we let the client know the server’s address? Many other example programs either choose to start another TCP socket to exchange addresses or fork()
a child process to run the client. Here we choose a simpler method: print out the address on the server side and let users input it manually.
constexpr size_t kMessageBufferSize = 8192;
int ServerMain(int argc, char **argv) {
struct fi_info *info = GetInfo();
auto net = Network::Open(info);
printf("domain: %14s", info->domain_attr->name);
printf(", nic: %10s", info->nic->device_attr->name);
printf(", fabric: %s", info->fabric_attr->prov_name);
printf(", link: %.0fGbps", info->nic->link_attr->speed / 1e9);
printf("\n");
printf("Run client with the following command:\n");
printf(" %s %s\n", argv[0], net.addr.ToString().c_str());
printf(" %s %s \"anytext\"\n", argv[0], net.addr.ToString().c_str());
printf("------\n");
auto buf_msg = Buffer::Alloc(kMessageBufferSize, kBufAlign);
net.RegisterMemory(buf_msg);
net.PostRecv(buf_msg, [](Network &net, RdmaOp &op) {
auto *msg = (const char *)op.recv.buf->data;
auto len = op.recv.recv_size;
printf("Received message (len=%zu): %.*s\n", len, (int)len, msg);
net.PostRecv(*op.recv.buf, std::move(op.callback));
});
for (;;) {
net.PollCompletion();
}
return 0;
}
Client Logic
The client logic is similar to the server logic. We first open the network interface, then add the server’s address to the address vector, register a buffer, and submit a SEND
operation. When the SEND
operation completes, the client’s logic ends.
int ClientMain(int argc, char **argv) {
CHECK(argc == 2 || argc == 3);
auto server_addrname = EfaAddress::Parse(argv[1]);
std::string message = argc == 3 ? argv[2] : "Hello, world!";
struct fi_info *info = GetInfo();
auto net = Network::Open(info);
printf("domain: %14s", info->domain_attr->name);
printf(", nic: %10s", info->nic->device_attr->name);
printf(", fabric: %s", info->fabric_attr->prov_name);
printf(", link: %.0fGbps", info->nic->link_attr->speed / 1e9);
printf("\n");
auto server_addr = net.AddPeerAddress(server_addrname);
auto buf_msg = Buffer::Alloc(kMessageBufferSize, kBufAlign);
net.RegisterMemory(buf_msg);
memcpy(buf_msg.data, message.data(), message.size());
bool sent = false;
net.PostSend(server_addr, buf_msg, message.size(),
[&sent](Network &net, RdmaOp &op) {
auto *msg = (const char *)op.send.buf->data;
auto len = op.send.len;
printf("Sent message (len=%zu): %.*s\n", len, (int)len, msg);
sent = true;
});
while (!sent) {
net.PollCompletion();
}
return 0;
}
Results
Chapter code: https://github.com/abcdabcd987/libfabric-efa-demo/blob/master/src/4_hello.cpp