if(HAVE_RDMA)
list(APPEND async_rdma_common_srcs
msg/async/rdma/Infiniband.cc
+ msg/async/rdma/Device.cc
msg/async/rdma/RDMAConnectedSocketImpl.cc
msg/async/rdma/RDMAServerSocketImpl.cc
msg/async/rdma/RDMAStack.cc)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Infiniband.h"
+#include "Device.h"
+#include "common/errno.h"
+#include "common/debug.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << "IBDevice "
+
+Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr)
+{
+#ifdef HAVE_IBV_EXP
+ union ibv_gid cgid;
+ struct ibv_exp_gid_attr gid_attr;
+ bool malformed = false;
+
+ ldout(cct,1) << __func__ << " using experimental verbs for gid" << dendl;
+ int r = ibv_query_port(ctxt, port_num, port_attr);
+ if (r == -1) {
+ lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+
+ lid = port_attr->lid;
+
+ // search for requested GID in GIDs table
+ ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
+ << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
+ r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
+ "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
+ ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
+ &cgid.raw[ 0], &cgid.raw[ 1],
+ &cgid.raw[ 2], &cgid.raw[ 3],
+ &cgid.raw[ 4], &cgid.raw[ 5],
+ &cgid.raw[ 6], &cgid.raw[ 7],
+ &cgid.raw[ 8], &cgid.raw[ 9],
+ &cgid.raw[10], &cgid.raw[11],
+ &cgid.raw[12], &cgid.raw[13],
+ &cgid.raw[14], &cgid.raw[15]);
+
+ if (r != 16) {
+ ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
+ malformed = true;
+ }
+
+ gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
+
+ for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) {
+ r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
+ if (r) {
+ lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
+ if (r) {
+ lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+
+ if (malformed) break; // stay with gid_idx=0
+ if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
+ (memcmp(&gid, &cgid, 16) == 0) ) {
+ ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
+ break;
+ }
+ }
+
+ if (gid_idx == port_attr->gid_tbl_len) {
+ lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
+ ceph_abort();
+ }
+#else
+ int r = ibv_query_port(ctxt, port_num, port_attr);
+ if (r == -1) {
+ lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+
+ lid = port_attr->lid;
+ r = ibv_query_gid(ctxt, port_num, 0, &gid);
+ if (r) {
+ lderr(cct) << __func__ << " query gid failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+#endif
+}
+
+
+Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)
+{
+ if (device == NULL) {
+ lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ name = ibv_get_device_name(device);
+ ctxt = ibv_open_device(device);
+ if (ctxt == NULL) {
+ lderr(cct) << __func__ << " open rdma device failed. " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ int r = ibv_query_device(ctxt, device_attr);
+ if (r == -1) {
+ lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+}
+
+Device::~Device()
+{
+ if (active_port) {
+ delete active_port;
+ assert(ibv_close_device(ctxt) == 0);
+ }
+}
+
+void Device::binding_port(CephContext *cct, int port_num) {
+ port_cnt = device_attr->phys_port_cnt;
+ for (uint8_t i = 0; i < port_cnt; ++i) {
+ Port *port = new Port(cct, ctxt, i+1);
+ if (i + 1 == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
+ active_port = port;
+ ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
+ break;
+ } else {
+ ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
+ }
+ delete port;
+ }
+ if (nullptr == active_port) {
+ lderr(cct) << __func__ << " port not found" << dendl;
+ assert(active_port);
+ }
+}
+
+
+DeviceList::DeviceList(CephContext *cct)
+ : device_list(ibv_get_device_list(&num))
+{
+ if (device_list == NULL || num == 0) {
+ lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ devices = new Device*[num];
+
+ for (int i = 0;i < num; ++i) {
+ devices[i] = new Device(cct, device_list[i]);
+ }
+}
+
+DeviceList::~DeviceList()
+{
+ for (int i=0; i < num; ++i) {
+ delete devices[i];
+ }
+ delete []devices;
+ ibv_free_device_list(device_list);
+}
+
+Device* DeviceList::get_device(const char* device_name)
+{
+ assert(devices);
+ for (int i = 0; i < num; ++i) {
+ if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) {
+ return devices[i];
+ }
+ }
+ return NULL;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_RDMA_DEVICE_H
+#define CEPH_RDMA_DEVICE_H
+
+#include <infiniband/verbs.h>
+
+#include <string>
+#include <vector>
+
+#include "include/int_types.h"
+#include "include/page.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "msg/msg_types.h"
+#include "msg/async/net_handler.h"
+#include "common/Mutex.h"
+
+class Port {
+ struct ibv_context* ctxt;
+ int port_num;
+ struct ibv_port_attr* port_attr;
+ uint16_t lid;
+ int gid_idx;
+ union ibv_gid gid;
+
+ public:
+ explicit Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn);
+ uint16_t get_lid() { return lid; }
+ ibv_gid get_gid() { return gid; }
+ int get_port_num() { return port_num; }
+ ibv_port_attr* get_port_attr() { return port_attr; }
+ int get_gid_idx() { return gid_idx; }
+};
+
+
+class Device {
+ ibv_device *device;
+ const char* name;
+ uint8_t port_cnt;
+ public:
+ explicit Device(CephContext *c, ibv_device* d);
+ ~Device();
+
+ const char* get_name() { return name;}
+ uint16_t get_lid() { return active_port->get_lid(); }
+ ibv_gid get_gid() { return active_port->get_gid(); }
+ int get_gid_idx() { return active_port->get_gid_idx(); }
+ void binding_port(CephContext *c, int port_num);
+ struct ibv_context *ctxt;
+ ibv_device_attr *device_attr;
+ Port* active_port;
+};
+
+
+class DeviceList {
+ struct ibv_device ** device_list;
+ int num;
+ Device** devices;
+ public:
+ DeviceList(CephContext *cct);
+ ~DeviceList();
+
+ Device* get_device(const char* device_name);
+};
+
+#endif
*/
#include "Infiniband.h"
+#include "Device.h"
+
#include "common/errno.h"
#include "common/debug.h"
static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
static const uint32_t CQ_DEPTH = 30000;
-Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr)
-{
-#ifdef HAVE_IBV_EXP
- union ibv_gid cgid;
- struct ibv_exp_gid_attr gid_attr;
- bool malformed = false;
-
- ldout(cct,1) << __func__ << " using experimental verbs for gid" << dendl;
- int r = ibv_query_port(ctxt, port_num, port_attr);
- if (r == -1) {
- lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
- ceph_abort();
- }
-
- lid = port_attr->lid;
-
- // search for requested GID in GIDs table
- ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
- << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
- r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
- "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
- ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
- &cgid.raw[ 0], &cgid.raw[ 1],
- &cgid.raw[ 2], &cgid.raw[ 3],
- &cgid.raw[ 4], &cgid.raw[ 5],
- &cgid.raw[ 6], &cgid.raw[ 7],
- &cgid.raw[ 8], &cgid.raw[ 9],
- &cgid.raw[10], &cgid.raw[11],
- &cgid.raw[12], &cgid.raw[13],
- &cgid.raw[14], &cgid.raw[15]);
-
- if (r != 16) {
- ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
- malformed = true;
- }
-
- gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
-
- for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) {
- r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
- if (r) {
- lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
- ceph_abort();
- }
- r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
- if (r) {
- lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
- ceph_abort();
- }
-
- if (malformed) break; // stay with gid_idx=0
- if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
- (memcmp(&gid, &cgid, 16) == 0) ) {
- ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
- break;
- }
- }
-
- if (gid_idx == port_attr->gid_tbl_len) {
- lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
- ceph_abort();
- }
-#else
- int r = ibv_query_port(ctxt, port_num, port_attr);
- if (r == -1) {
- lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
- ceph_abort();
- }
-
- lid = port_attr->lid;
- r = ibv_query_gid(ctxt, port_num, 0, &gid);
- if (r) {
- lderr(cct) << __func__ << " query gid failed " << cpp_strerror(errno) << dendl;
- ceph_abort();
- }
-#endif
-}
-
-
-Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)
-{
- if (device == NULL) {
- lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
- ceph_abort();
- }
- name = ibv_get_device_name(device);
- ctxt = ibv_open_device(device);
- if (ctxt == NULL) {
- lderr(cct) << __func__ << " open rdma device failed. " << cpp_strerror(errno) << dendl;
- ceph_abort();
- }
- int r = ibv_query_device(ctxt, device_attr);
- if (r == -1) {
- lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
- ceph_abort();
- }
-}
-
-void Device::binding_port(CephContext *cct, int port_num) {
- port_cnt = device_attr->phys_port_cnt;
- for (uint8_t i = 0; i < port_cnt; ++i) {
- Port *port = new Port(cct, ctxt, i+1);
- if (i + 1 == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
- active_port = port;
- ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
- break;
- } else {
- ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
- }
- delete port;
- }
- if (nullptr == active_port) {
- lderr(cct) << __func__ << " port not found" << dendl;
- assert(active_port);
- }
-}
-
-
Infiniband::QueuePair::QueuePair(
CephContext *c, Infiniband& infiniband, ibv_qp_type type,
int port, ibv_srq *srq,
}
-Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num): device_list(cct)
+Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
+ : device_list(new DeviceList(cct))
{
- device = device_list.get_device(device_name.c_str());
+ device = device_list->get_device(device_name.c_str());
device->binding_port(cct, port_num);
assert(device);
ib_physical_port = device->active_port->get_port_num();
assert(ibv_destroy_srq(srq) == 0);
delete memory_manager;
delete pd;
+ delete device_list;
}
/**
class RDMAStack;
class CephContext;
-
-class Port {
- struct ibv_context* ctxt;
- int port_num;
- struct ibv_port_attr* port_attr;
- uint16_t lid;
- int gid_idx;
- union ibv_gid gid;
-
- public:
- explicit Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn);
- uint16_t get_lid() { return lid; }
- ibv_gid get_gid() { return gid; }
- int get_port_num() { return port_num; }
- ibv_port_attr* get_port_attr() { return port_attr; }
- int get_gid_idx() { return gid_idx; }
-};
-
-
-class Device {
- ibv_device *device;
- const char* name;
- uint8_t port_cnt;
- public:
- explicit Device(CephContext *c, ibv_device* d);
- ~Device() {
- if (active_port) {
- delete active_port;
- assert(ibv_close_device(ctxt) == 0);
- }
- }
- const char* get_name() { return name;}
- uint16_t get_lid() { return active_port->get_lid(); }
- ibv_gid get_gid() { return active_port->get_gid(); }
- int get_gid_idx() { return active_port->get_gid_idx(); }
- void binding_port(CephContext *c, int port_num);
- struct ibv_context *ctxt;
- ibv_device_attr *device_attr;
- Port* active_port;
-};
-
-
-class DeviceList {
- struct ibv_device ** device_list;
- int num;
- Device** devices;
- public:
- DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)) {
- if (device_list == NULL || num == 0) {
- lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl;
- ceph_abort();
- }
- devices = new Device*[num];
-
- for (int i = 0;i < num; ++i) {
- devices[i] = new Device(cct, device_list[i]);
- }
- }
- ~DeviceList() {
- for (int i=0; i < num; ++i) {
- delete devices[i];
- }
- delete []devices;
- ibv_free_device_list(device_list);
- }
-
- Device* get_device(const char* device_name) {
- assert(devices);
- for (int i = 0; i < num; ++i) {
- if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) {
- return devices[i];
- }
- }
- return NULL;
- }
-};
-
+class Port;
+class Device;
+class DeviceList;
class Infiniband {
public:
ibv_srq* srq; // shared receive work queue
Device *device;
ProtectionDomain *pd;
- DeviceList device_list;
+ DeviceList *device_list;
void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
uint8_t get_ib_physical_port() { return ib_physical_port; }
int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
- uint16_t get_lid() { return device->get_lid(); }
- ibv_gid get_gid() { return device->get_gid(); }
MemoryManager* get_memory_manager() { return memory_manager; }
Device* get_device() { return device; }
- int get_async_fd() { return device->ctxt->async_fd; }
bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);}
Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
*/
#include "RDMAStack.h"
+#include "Device.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
is_server(false), con_handler(new C_handle_connection(this)),
active(false)
{
+ ibdev = ib->get_device();
+ ibport = ib->get_ib_physical_port();
+
qp = infiniband->create_queue_pair(
cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC);
my_msg.qpn = qp->get_local_qp_number();
my_msg.psn = qp->get_initial_psn();
- my_msg.lid = infiniband->get_lid();
+ my_msg.lid = ibdev->get_lid();
my_msg.peer_qpn = 0;
- my_msg.gid = infiniband->get_gid();
+ my_msg.gid = ibdev->get_gid();
notify_fd = dispatcher->register_qp(qp, this);
dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
void RDMAConnectedSocketImpl::notify()
{
uint64_t i = 1;
- write(notify_fd, &i, sizeof(i));
+ int ret;
+
+ ret = write(notify_fd, &i, sizeof(i));
+ assert(ret = sizeof(i));
}
void RDMAConnectedSocketImpl::shutdown()
#include "common/deleter.h"
#include "common/Tub.h"
#include "RDMAStack.h"
+#include "Device.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
private:
CephContext *cct;
Infiniband::QueuePair *qp;
+ Device *ibdev;
+ int ibport;
IBSYNMsg peer_msg;
IBSYNMsg my_msg;
int connected;