BuildRequires: nss-devel
BuildRequires: keyutils-libs-devel
BuildRequires: libibverbs-devel
+BuildRequires: librdmacm-devel
BuildRequires: openldap-devel
BuildRequires: openssl-devel
BuildRequires: CUnit-devel
set(RDMA_NAMES ${RDMA_NAMES} ibverbs)
find_library(RDMA_LIBRARY NAMES ${RDMA_NAMES})
+set(RDMACM_NAMES ${RDMACM_NAMES} rdmacm)
+find_library(RDMACM_LIBRARY NAMES ${RDMACM_NAMES})
+
if (RDMA_INCLUDE_DIR AND RDMA_LIBRARY)
set(RDMA_FOUND TRUE)
- set(RDMA_LIBRARIES ${RDMA_LIBRARY})
+ set(RDMA_LIBRARIES ${RDMA_LIBRARY} ${RDMACM_LIBRARY})
else ()
set(RDMA_FOUND FALSE)
set( RDMA_LIBRARIES )
if (RDMA_FOUND)
message(STATUS "Found libibverbs: ${RDMA_LIBRARY}")
+ message(STATUS "Found librdmacm: ${RDMACM_LIBRARY}")
include(CheckCXXSourceCompiles)
CHECK_CXX_SOURCE_COMPILES("
libfuse-dev,
libgoogle-perftools-dev [i386 amd64 arm64],
libibverbs-dev,
+ librdmacm-dev,
libkeyutils-dev,
libldap2-dev,
libleveldb-dev,
list(APPEND async_rdma_common_srcs
msg/async/rdma/Infiniband.cc
msg/async/rdma/RDMAConnectedSocketImpl.cc
+ msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
msg/async/rdma/RDMAServerSocketImpl.cc
+ msg/async/rdma/RDMAIWARPServerSocketImpl.cc
msg/async/rdma/RDMAStack.cc)
endif(HAVE_RDMA)
list(APPEND ceph_common_deps ${UDEV_LIBRARIES})
endif()
if(HAVE_RDMA)
- list(APPEND ceph_common_deps ${RDMA_LIBRARY})
+ list(APPEND ceph_common_deps ${RDMA_LIBRARIES})
endif()
if(NOT WITH_SYSTEM_BOOST)
list(APPEND ceph_common_deps ${ZLIB_LIBRARIES})
OPTION(ms_async_rdma_sl, OPT_INT) // in RoCE, this means PCP
OPTION(ms_async_rdma_dscp, OPT_INT) // in RoCE, this means DSCP
+// rdma connection management
+OPTION(ms_async_rdma_cm, OPT_BOOL)
+OPTION(ms_async_rdma_type, OPT_STR)
+
OPTION(ms_dpdk_port_id, OPT_INT)
SAFE_OPTION(ms_dpdk_coremask, OPT_STR) // it is modified in unittest so that use SAFE_OPTION to declare
OPTION(ms_dpdk_memory_channel, OPT_STR)
.set_default(96)
.set_description(""),
+ Option("ms_async_rdma_cm", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
+ .set_default(false)
+ .set_description(""),
+
+ Option("ms_async_rdma_type", Option::TYPE_STR, Option::LEVEL_ADVANCED)
+ .set_default("ib")
+ .set_description(""),
+
Option("ms_dpdk_port_id", Option::TYPE_INT, Option::LEVEL_ADVANCED)
.set_default(0)
.set_description(""),
}
-Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)
+Device::Device(CephContext *cct, ibv_device* d, struct ibv_context *dc)
+ : device(d), device_attr(new ibv_device_attr), active_port(nullptr)
{
if (device == NULL) {
lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
ceph_abort();
}
name = ibv_get_device_name(device);
- ctxt = ibv_open_device(device);
+ if (cct->_conf->ms_async_rdma_cm) {
+ ctxt = dc;
+ } else {
+ ctxt = ibv_open_device(device);
+ }
if (ctxt == NULL) {
lderr(cct) << __func__ << " open rdma device failed. " << cpp_strerror(errno) << dendl;
ceph_abort();
CephContext *c, Infiniband& infiniband, ibv_qp_type type,
int port, ibv_srq *srq,
Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
- uint32_t tx_queue_len, uint32_t rx_queue_len, uint32_t q_key)
+ uint32_t tx_queue_len, uint32_t rx_queue_len, struct rdma_cm_id *cid, uint32_t q_key)
: cct(c), infiniband(infiniband),
type(type),
ctxt(infiniband.device->ctxt),
pd(infiniband.pd->pd),
srq(srq),
qp(NULL),
+ cm_id(cid),
txcq(txcq),
rxcq(rxcq),
initial_psn(0),
qpia.qp_type = type; // RC, UC, UD, or XRC
qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs
- qp = ibv_create_qp(pd, &qpia);
- if (qp == NULL) {
- lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
- if (errno == ENOMEM) {
- lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, "
- " ms_async_rdma_send_buffers or"
- " ms_async_rdma_buffer_size" << dendl;
+ if (!cct->_conf->ms_async_rdma_cm) {
+ qp = ibv_create_qp(pd, &qpia);
+ if (qp == NULL) {
+ lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
+ if (errno == ENOMEM) {
+ lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, "
+ " ms_async_rdma_send_buffers or"
+ " ms_async_rdma_buffer_size" << dendl;
+ }
+ return -1;
}
- return -1;
+ } else {
+ assert(cm_id->verbs == pd->context);
+ if (rdma_create_qp(cm_id, pd, &qpia)) {
+ lderr(cct) << __func__ << " failed to create queue pair with rdmacm library"
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ qp = cm_id->qp;
}
-
ldout(cct, 20) << __func__ << " successfully create queue pair: "
<< "qp=" << qp << dendl;
+ if (cct->_conf->ms_async_rdma_cm)
+ return 0;
+
// move from RESET to INIT state
ibv_qp_attr qpa;
memset(&qpa, 0, sizeof(qpa));
* QueuePair on success or NULL if init fails
* See QueuePair::QueuePair for parameter documentation.
*/
-Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)
+Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx,
+ CompletionQueue* rx, ibv_qp_type type, struct rdma_cm_id *cm_id)
{
Infiniband::QueuePair *qp = new QueuePair(
- cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len);
+ cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len, cm_id);
if (qp->init()) {
delete qp;
return NULL;
#include <include/assert.h>
#include <infiniband/verbs.h>
+#include <rdma/rdma_cma.h>
#include <atomic>
#include <string>
const char* name;
uint8_t port_cnt = 0;
public:
- explicit Device(CephContext *c, ibv_device* d);
+ explicit Device(CephContext *c, ibv_device* d, struct ibv_context *dc);
~Device() {
if (active_port) {
delete active_port;
class DeviceList {
struct ibv_device ** device_list;
+ struct ibv_context ** device_context_list;
int num;
Device** devices;
public:
- explicit DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)) {
+ explicit DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)),
+ device_context_list(rdma_get_devices(&num)) {
if (device_list == NULL || num == 0) {
lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl;
ceph_abort();
devices = new Device*[num];
for (int i = 0;i < num; ++i) {
- devices[i] = new Device(cct, device_list[i]);
+ devices[i] = new Device(cct, device_list[i], device_context_list[i]);
}
}
~DeviceList() {
int ib_physical_port, ibv_srq *srq,
Infiniband::CompletionQueue* txcq,
Infiniband::CompletionQueue* rxcq,
- uint32_t tx_queue_len, uint32_t max_recv_wr, uint32_t q_key = 0);
+ uint32_t tx_queue_len, uint32_t max_recv_wr, struct rdma_cm_id *cid, uint32_t q_key = 0);
~QueuePair();
int init();
ibv_pd* pd; // protection domain
ibv_srq* srq; // shared receive queue
ibv_qp* qp; // infiniband verbs QP handle
+ struct rdma_cm_id *cm_id;
Infiniband::CompletionQueue* txcq;
Infiniband::CompletionQueue* rxcq;
uint32_t initial_psn; // initial packet sequence number
public:
typedef MemoryManager::Cluster Cluster;
typedef MemoryManager::Chunk Chunk;
- QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);
+ QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*,
+ ibv_qp_type type, struct rdma_cm_id *cm_id);
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
// post rx buffers to srq, return number of buffers actually posted
int post_chunks_to_srq(int num);
* Foundation. See file COPYING.
*
*/
-
#include "RDMAStack.h"
#define dout_subsys ceph_subsys_ms
is_server(false), con_handler(new C_handle_connection(this)),
active(false), pending(false)
{
- qp = infiniband->create_queue_pair(
- cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC);
- my_msg.qpn = qp->get_local_qp_number();
- my_msg.psn = qp->get_initial_psn();
- my_msg.lid = infiniband->get_lid();
- my_msg.peer_qpn = 0;
- my_msg.gid = infiniband->get_gid();
- notify_fd = dispatcher->register_qp(qp, this);
- dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
- dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
+ if (!cct->_conf->ms_async_rdma_cm) {
+ qp = infiniband->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL);
+ my_msg.qpn = qp->get_local_qp_number();
+ my_msg.psn = qp->get_initial_psn();
+ my_msg.lid = infiniband->get_lid();
+ my_msg.peer_qpn = 0;
+ my_msg.gid = infiniband->get_gid();
+ notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+ dispatcher->register_qp(qp, this);
+ dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
+ dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
+ }
}
RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
void RDMAConnectedSocketImpl::fin() {
ibv_send_wr wr;
memset(&wr, 0, sizeof(wr));
+
wr.wr_id = reinterpret_cast<uint64_t>(qp);
wr.num_sge = 0;
wr.opcode = IBV_WR_SEND;
--- /dev/null
+#include "RDMAStack.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << " RDMAIWARPConnectedSocketImpl "
+
+#define TIMEOUT_MS 3000
+#define RETRY_COUNT 7
+
+RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+ RDMAWorker *w, RDMACMInfo *info)
+ : RDMAConnectedSocketImpl(cct, ib, s, w), cm_con_handler(new C_handle_cm_connection(this))
+{
+ status = IDLE;
+ notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+ if (info) {
+ is_server = true;
+ cm_id = info->cm_id;
+ cm_channel = info->cm_channel;
+ status = RDMA_ID_CREATED;
+ remote_qpn = info->qp_num;
+ worker->center.submit_to(worker->center.get_id(), [this]() {
+ worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
+ status = CHANNEL_FD_CREATED;
+ }, false);
+ if (alloc_resource()) {
+ close_notify();
+ return;
+ }
+ status = RESOURCE_ALLOCATED;
+ local_qpn = qp->get_local_qp_number();
+ my_msg.qpn = local_qpn;
+ } else {
+ is_server = false;
+ cm_channel = rdma_create_event_channel();
+ rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP);
+ status = RDMA_ID_CREATED;
+ ldout(cct, 20) << __func__ << " successfully created cm id: " << cm_id << dendl;
+ }
+}
+
+RDMAIWARPConnectedSocketImpl::~RDMAIWARPConnectedSocketImpl() {
+ ldout(cct, 20) << __func__ << " destruct." << dendl;
+ std::unique_lock l(close_mtx);
+ close_condition.wait(l, [&] { return closed; });
+ if (status >= RDMA_ID_CREATED) {
+ rdma_destroy_id(cm_id);
+ rdma_destroy_event_channel(cm_channel);
+ }
+}
+
+int RDMAIWARPConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
+ worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
+ status = CHANNEL_FD_CREATED;
+ if (rdma_resolve_addr(cm_id, NULL, const_cast<struct sockaddr*>(peer_addr.get_sockaddr()), TIMEOUT_MS)) {
+ lderr(cct) << __func__ << " failed to resolve addr" << dendl;
+ return -1;
+ }
+ return 0;
+}
+
+void RDMAIWARPConnectedSocketImpl::close() {
+ error = ECONNRESET;
+ active = false;
+ if (status >= CONNECTED)
+ rdma_disconnect(cm_id);
+ close_notify();
+}
+
+void RDMAIWARPConnectedSocketImpl::shutdown() {
+ error = ECONNRESET;
+ active = false;
+}
+
+void RDMAIWARPConnectedSocketImpl::handle_cm_connection() {
+ struct rdma_cm_event *event;
+ rdma_get_cm_event(cm_channel, &event);
+ ldout(cct, 20) << __func__ << " event name: " << rdma_event_str(event->event)
+ << " (cm id: " << cm_id << ")" << dendl;
+ struct rdma_conn_param cm_params;
+ switch (event->event) {
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ status = ADDR_RESOLVED;
+ if (rdma_resolve_route(cm_id, TIMEOUT_MS)) {
+ lderr(cct) << __func__ << " failed to resolve rdma addr" << dendl;
+ notify();
+ }
+ break;
+
+ case RDMA_CM_EVENT_ROUTE_RESOLVED:
+ status = ROUTE_RESOLVED;
+ if (alloc_resource()) {
+ lderr(cct) << __func__ << " failed to alloc resource while resolving the route" << dendl;
+ connected = -ECONNREFUSED;
+ notify();
+ break;
+ }
+ local_qpn = qp->get_local_qp_number();
+ my_msg.qpn = local_qpn;
+
+ memset(&cm_params, 0, sizeof(cm_params));
+ cm_params.retry_count = RETRY_COUNT;
+ cm_params.qp_num = local_qpn;
+ if (rdma_connect(cm_id, &cm_params)) {
+ lderr(cct) << __func__ << " failed to connect remote rdma port" << dendl;
+ connected = -ECONNREFUSED;
+ notify();
+ }
+ break;
+
+ case RDMA_CM_EVENT_ESTABLISHED:
+ status = CONNECTED;
+ if (!is_server) {
+ remote_qpn = event->param.conn.qp_num;
+ activate();
+ notify();
+ }
+ break;
+
+ case RDMA_CM_EVENT_ADDR_ERROR:
+ case RDMA_CM_EVENT_ROUTE_ERROR:
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ case RDMA_CM_EVENT_UNREACHABLE:
+ case RDMA_CM_EVENT_REJECTED:
+ lderr(cct) << __func__ << " rdma connection rejected" << dendl;
+ connected = -ECONNREFUSED;
+ notify();
+ break;
+
+ case RDMA_CM_EVENT_DISCONNECTED:
+ status = DISCONNECTED;
+ close_notify();
+ if (!error) {
+ error = ECONNRESET;
+ notify();
+ }
+ break;
+
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ break;
+
+ default:
+ assert(0 == "unhandled event");
+ break;
+ }
+ rdma_ack_cm_event(event);
+}
+
+void RDMAIWARPConnectedSocketImpl::activate() {
+ ldout(cct, 30) << __func__ << dendl;
+ active = true;
+ connected = 1;
+}
+
+int RDMAIWARPConnectedSocketImpl::alloc_resource() {
+ ldout(cct, 30) << __func__ << dendl;
+ qp = infiniband->create_queue_pair(cct, dispatcher->get_tx_cq(),
+ dispatcher->get_rx_cq(), IBV_QPT_RC, cm_id);
+ if (!qp) {
+ return -1;
+ }
+ dispatcher->register_qp(qp, this);
+ dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
+ dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
+ return 0;
+}
+
+void RDMAIWARPConnectedSocketImpl::close_notify() {
+ ldout(cct, 30) << __func__ << dendl;
+ if (status >= CHANNEL_FD_CREATED) {
+ worker->center.delete_file_event(cm_channel->fd, EVENT_READABLE);
+ }
+ std::unique_lock l(close_mtx);
+ if (!closed) {
+ closed = true;
+ close_condition.notify_all();
+ }
+}
--- /dev/null
+#include <poll.h>
+
+#include "msg/async/net_handler.h"
+#include "RDMAStack.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << " RDMAIWARPServerSocketImpl "
+
+RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(CephContext *cct, Infiniband* i,
+ RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
+ : RDMAServerSocketImpl(cct, i, s, w, a)
+{
+}
+
+int RDMAIWARPServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
+{
+ ldout(cct, 20) << __func__ << " bind to rdma point" << dendl;
+ cm_channel = rdma_create_event_channel();
+ rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP);
+ ldout(cct, 20) << __func__ << " successfully created cm id: " << cm_id << dendl;
+ int rc = rdma_bind_addr(cm_id, const_cast<struct sockaddr*>(sa.get_sockaddr()));
+ if (rc < 0) {
+ rc = -errno;
+ ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
+ << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;
+ goto err;
+ }
+ rc = rdma_listen(cm_id, 128);
+ if (rc < 0) {
+ rc = -errno;
+ ldout(cct, 10) << __func__ << " unable to listen to " << sa.get_sockaddr()
+ << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;
+ goto err;
+ }
+ server_setup_socket = cm_channel->fd;
+ ldout(cct, 20) << __func__ << " fd of cm_channel is " << server_setup_socket << dendl;
+ return 0;
+
+err:
+ server_setup_socket = -1;
+ rdma_destroy_id(cm_id);
+ rdma_destroy_event_channel(cm_channel);
+ return rc;
+}
+
+int RDMAIWARPServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt,
+ entity_addr_t *out, Worker *w)
+{
+ ldout(cct, 15) << __func__ << dendl;
+
+ assert(sock);
+ struct pollfd pfd = {
+ .fd = cm_channel->fd,
+ .events = POLLIN,
+ };
+ int ret = poll(&pfd, 1, 0);
+ assert(ret >= 0);
+ if (!ret)
+ return -EAGAIN;
+
+ struct rdma_cm_event *cm_event;
+ rdma_get_cm_event(cm_channel, &cm_event);
+ ldout(cct, 20) << __func__ << " event name: " << rdma_event_str(cm_event->event) << dendl;
+
+ struct rdma_cm_id *event_cm_id = cm_event->id;
+ struct rdma_event_channel *event_channel = rdma_create_event_channel();
+
+ rdma_migrate_id(event_cm_id, event_channel);
+
+ struct rdma_cm_id *new_cm_id = event_cm_id;
+ struct rdma_conn_param *remote_conn_param = &cm_event->param.conn;
+ struct rdma_conn_param local_conn_param;
+
+ RDMACMInfo info(new_cm_id, event_channel, remote_conn_param->qp_num);
+ RDMAIWARPConnectedSocketImpl* server =
+ new RDMAIWARPConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w), &info);
+
+ memset(&local_conn_param, 0, sizeof(local_conn_param));
+ local_conn_param.qp_num = server->get_local_qpn();
+
+ if (rdma_accept(new_cm_id, &local_conn_param)) {
+ return -EAGAIN;
+ }
+ server->activate();
+ ldout(cct, 20) << __func__ << " accepted a new QP" << dendl;
+
+ rdma_ack_cm_event(cm_event);
+
+ std::unique_ptr<RDMAConnectedSocketImpl> csi(server);
+ *sock = ConnectedSocket(std::move(csi));
+ struct sockaddr *addr = &new_cm_id->route.addr.dst_addr;
+ out->set_sockaddr(addr);
+
+ return 0;
+}
+
+void RDMAIWARPServerSocketImpl::abort_accept()
+{
+ if (server_setup_socket >= 0) {
+ rdma_destroy_id(cm_id);
+ rdma_destroy_event_channel(cm_channel);
+ }
+}
ldout(cct, 15) << __func__ << dendl;
assert(sock);
+
sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int sd = ::accept(server_setup_socket, (sockaddr*)&ss, &slen);
} else {
ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
conn->fault();
- erase_qpn_lockless(qpn);
+ if (!cct->_conf->ms_async_rdma_cm)
+ erase_qpn_lockless(qpn);
}
} else {
ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << get_stack()->get_infiniband().get_device()->ctxt
}
}
-int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
+void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
{
- int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
- assert(fd >= 0);
Mutex::Locker l(lock);
assert(!qp_conns.count(qp->get_local_qp_number()));
qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
++num_qp_conn;
- return fd;
}
RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
{
get_stack()->get_infiniband().init();
dispatcher->polling_start();
-
- auto p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa);
+ RDMAServerSocketImpl *p;
+ if (cct->_conf->ms_async_rdma_type == "iwarp") {
+ p = new RDMAIWARPServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa);
+ } else {
+ p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa);
+ }
int r = p->listen(sa, opt);
if (r < 0) {
delete p;
get_stack()->get_infiniband().init();
dispatcher->polling_start();
- RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this);
+ RDMAConnectedSocketImpl* p;
+ if (cct->_conf->ms_async_rdma_type == "iwarp") {
+ p = new RDMAIWARPConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this);
+ } else {
+ p = new RDMAConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this);
+ }
int r = p->try_connect(addr, opts);
if (r < 0) {
void polling_start();
void polling_stop();
void polling();
- int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
+ void register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
void make_pending_worker(RDMAWorker* w) {
Mutex::Locker l(w_lock);
auto it = std::find(pending_workers.begin(), pending_workers.end(), w);
}
};
+struct RDMACMInfo {
+ RDMACMInfo(rdma_cm_id *cid, rdma_event_channel *cm_channel_, uint32_t qp_num_)
+ : cm_id(cid), cm_channel(cm_channel_), qp_num(qp_num_) {}
+ rdma_cm_id *cm_id;
+ rdma_event_channel *cm_channel;
+ uint32_t qp_num;
+};
+
class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
public:
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::CompletionChannel CompletionChannel;
typedef Infiniband::CompletionQueue CompletionQueue;
- private:
+ protected:
CephContext *cct;
Infiniband::QueuePair *qp;
IBSYNMsg peer_msg;
void handle_connection();
void cleanup();
void set_accept_fd(int sd);
- int try_connect(const entity_addr_t&, const SocketOptions &opt);
+ virtual int try_connect(const entity_addr_t&, const SocketOptions &opt);
bool is_pending() {return pending;}
void set_pending(bool val) {pending = val;}
+
class C_handle_connection : public EventCallback {
RDMAConnectedSocketImpl *csi;
bool active;
};
};
+enum RDMA_CM_STATUS {
+ IDLE = 1,
+ RDMA_ID_CREATED,
+ CHANNEL_FD_CREATED,
+ RESOURCE_ALLOCATED,
+ ADDR_RESOLVED,
+ ROUTE_RESOLVED,
+ CONNECTED,
+ DISCONNECTED,
+ ERROR
+};
+
+class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl {
+ public:
+ RDMAIWARPConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+ RDMAWorker *w, RDMACMInfo *info = nullptr);
+ ~RDMAIWARPConnectedSocketImpl();
+ virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
+ virtual void close() override;
+ virtual void shutdown() override;
+ virtual void handle_cm_connection();
+ uint32_t get_local_qpn() const { return local_qpn; }
+ void activate();
+ int alloc_resource();
+ void close_notify();
+
+ private:
+ rdma_cm_id *cm_id;
+ rdma_event_channel *cm_channel;
+ uint32_t local_qpn;
+ uint32_t remote_qpn;
+ EventCallbackRef cm_con_handler;
+ bool is_server;
+ std::mutex close_mtx;
+ std::condition_variable close_condition;
+ bool closed;
+ RDMA_CM_STATUS status;
+
+
+ class C_handle_cm_connection : public EventCallback {
+ RDMAIWARPConnectedSocketImpl *csi;
+ public:
+ C_handle_cm_connection(RDMAIWARPConnectedSocketImpl *w): csi(w) {}
+ void do_request(uint64_t fd) {
+ csi->handle_cm_connection();
+ }
+ };
+};
+
class RDMAServerSocketImpl : public ServerSocketImpl {
- CephContext *cct;
- NetHandler net;
- int server_setup_socket;
- Infiniband* infiniband;
- RDMADispatcher *dispatcher;
- RDMAWorker *worker;
- entity_addr_t sa;
+ protected:
+ CephContext *cct;
+ NetHandler net;
+ int server_setup_socket;
+ Infiniband* infiniband;
+ RDMADispatcher *dispatcher;
+ RDMAWorker *worker;
+ entity_addr_t sa;
public:
- RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
+ RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s,
+ RDMAWorker *w, entity_addr_t& a);
- int listen(entity_addr_t &sa, const SocketOptions &opt);
+ virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
virtual void abort_accept() override;
virtual int fd() const override { return server_setup_socket; }
int get_fd() { return server_setup_socket; }
};
+class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
+ public:
+ RDMAIWARPServerSocketImpl(CephContext *cct, Infiniband *i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
+ virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
+ virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
+ virtual void abort_accept() override;
+ private:
+ rdma_cm_id *cm_id;
+ rdma_event_channel *cm_channel;
+};
+
class RDMAStack : public NetworkStack {
vector<std::thread> threads;
PerfCounters *perf_counter;