From 91bb13dceddb433966e6f775716e53ec5f523f5e Mon Sep 17 00:00:00 2001 From: Amir Vadai Date: Wed, 22 Mar 2017 09:05:21 +0200 Subject: [PATCH] msg/async/rdma: Introduce RDMAConnMgr Encapsulate all connection establishment stuff in a new class - RDMAConnMgr and make it a friend class of RDMAConnectedSocketImpl. This class will be inherited for every type of connection establishment - Currently only TCP is supported, very soon CM will be added too. RDMAServerConnImpl which only handle connection establishment became an abstract class and RDMAServerConnTCP is inherting it for connections of type TCP. Some of the code was left in its original file and place, and therefore it looks misplaced. This was done to make it easier to review and rebase. Once it is accepted a cleanup patch will be sent to move the code into the right place. Issue: 995322 Change-Id: I8b0e163525ec80c2452f4b6481bf696968cc1e51 Signed-off-by: Amir Vadai --- src/CMakeLists.txt | 1 + src/msg/async/rdma/Infiniband.cc | 88 ---------- src/msg/async/rdma/Infiniband.h | 10 +- src/msg/async/rdma/RDMAConnTCP.cc | 117 +++++++++++++ src/msg/async/rdma/RDMAConnTCP.h | 84 +++++++++ src/msg/async/rdma/RDMAConnectedSocketImpl.cc | 160 +++++++++++------- src/msg/async/rdma/RDMAConnectedSocketImpl.h | 124 +++++++++----- src/msg/async/rdma/RDMAServerSocketImpl.cc | 16 +- src/msg/async/rdma/RDMAStack.cc | 3 +- 9 files changed, 402 insertions(+), 201 deletions(-) create mode 100644 src/msg/async/rdma/RDMAConnTCP.cc create mode 100644 src/msg/async/rdma/RDMAConnTCP.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 179598a332463..c578aff56c73e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -349,6 +349,7 @@ if(HAVE_RDMA) msg/async/rdma/Infiniband.cc msg/async/rdma/Device.cc msg/async/rdma/RDMAConnectedSocketImpl.cc + msg/async/rdma/RDMAConnTCP.cc msg/async/rdma/RDMAServerSocketImpl.cc msg/async/rdma/RDMAStack.cc) endif(HAVE_RDMA) diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index 74f5c0436abd3..0a87b04ed5ac9 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -26,7 +26,6 @@ #define dout_prefix *_dout << "Infiniband " static const uint32_t MAX_INLINE_DATA = 0; -static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000"); Infiniband::QueuePair::QueuePair( CephContext *c, Device &device, ibv_qp_type type, @@ -613,93 +612,6 @@ Device* Infiniband::get_device(const char* device_name) return device_list->get_device(device_name); } -// 1 means no valid buffer read, 0 means got enough buffer -// else return < 0 means error -int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im) -{ - char msg[TCP_MSG_LEN]; - char gid[33]; - ssize_t r = ::read(sd, &msg, sizeof(msg)); - // Drop incoming qpt - if (cct->_conf->ms_inject_socket_failures && sd >= 0) { - if (rand() % cct->_conf->ms_inject_socket_failures == 0) { - ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; - return -EINVAL; - } - } - if (r < 0) { - r = -errno; - lderr(cct) << __func__ << " got error " << r << ": " - << cpp_strerror(r) << dendl; - } else if (r == 0) { // valid disconnect message of length 0 - ldout(cct, 10) << __func__ << " got disconnect message " << dendl; - } else if ((size_t)r != sizeof(msg)) { // invalid message - ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl; - r = -EINVAL; - } else { // valid message - sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid); - wire_gid_to_gid(gid, &(im.gid)); - ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl; - } - return r; -} - -int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im) -{ - int retry = 0; - ssize_t r; - - char msg[TCP_MSG_LEN]; - char gid[33]; -retry: - gid_to_wire_gid(&(im.gid), gid); - sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid); - ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn - << ", " << im.peer_qpn << ", " << gid << dendl; - r = ::write(sd, msg, sizeof(msg)); - // Drop incoming qpt - if (cct->_conf->ms_inject_socket_failures && sd >= 0) { - if (rand() % cct->_conf->ms_inject_socket_failures == 0) { - ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; - return -EINVAL; - } - } - - if ((size_t)r != sizeof(msg)) { - // FIXME need to handle EAGAIN instead of retry - if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) { - retry++; - goto retry; - } - if (r < 0) - lderr(cct) << __func__ << " send returned error " << errno << ": " - << cpp_strerror(errno) << dendl; - else - lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl; - return -errno; - } - return 0; -} - -void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid) -{ - char tmp[9]; - uint32_t v32; - int i; - - for (tmp[8] = 0, i = 0; i < 4; ++i) { - memcpy(tmp, wgid + i * 8, 8); - sscanf(tmp, "%x", &v32); - *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32); - } -} - -void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[]) -{ - for (int i = 0; i < 4; ++i) - sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4))); -} - Infiniband::QueuePair::~QueuePair() { if (qp) { diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index 682aeffbc6fab..cd74d9e57fc8e 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -144,9 +144,6 @@ class Infiniband { DeviceList *device_list; RDMADispatcher *dispatcher = nullptr; - void wire_gid_to_gid(const char *wgid, union ibv_gid *gid); - void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]); - public: explicit Infiniband(CephContext *c); ~Infiniband(); @@ -268,8 +265,6 @@ class Infiniband { }; public: - int send_msg(CephContext *cct, int sd, IBSYNMsg& msg); - int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg); static const char* wc_status_to_string(int status); static const char* qp_state_string(int status); @@ -285,4 +280,9 @@ class Infiniband { RDMADispatcher *get_dispatcher() { return dispatcher; } }; +inline ostream& operator<<(ostream& out, const Infiniband::QueuePair &qp) +{ + return out << qp.get_local_qp_number(); +} + #endif diff --git a/src/msg/async/rdma/RDMAConnTCP.cc b/src/msg/async/rdma/RDMAConnTCP.cc new file mode 100644 index 0000000000000..f2c3c9a68b294 --- /dev/null +++ b/src/msg/async/rdma/RDMAConnTCP.cc @@ -0,0 +1,117 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "RDMAStack.h" +#include "Device.h" +#include "RDMAConnTCP.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << " RDMAConnTCP " + +static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000"); + +// 1 means no valid buffer read, 0 means got enough buffer +// else return < 0 means error +int RDMAConnTCP::recv_msg(CephContext *cct, int sd, IBSYNMsg& im) +{ + char msg[TCP_MSG_LEN]; + char gid[33]; + ssize_t r = ::read(sd, &msg, sizeof(msg)); + // Drop incoming qpt + if (cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; + return -EINVAL; + } + } + if (r < 0) { + r = -errno; + lderr(cct) << __func__ << " got error " << r << ": " + << cpp_strerror(r) << dendl; + } else if (r == 0) { // valid disconnect message of length 0 + ldout(cct, 10) << __func__ << " got disconnect message " << dendl; + } else if ((size_t)r != sizeof(msg)) { // invalid message + ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl; + r = -EINVAL; + } else { // valid message + sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid); + wire_gid_to_gid(gid, &(im.gid)); + ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl; + } + return r; +} + +int RDMAConnTCP::send_msg(CephContext *cct, int sd, IBSYNMsg& im) +{ + int retry = 0; + ssize_t r; + + char msg[TCP_MSG_LEN]; + char gid[33]; +retry: + gid_to_wire_gid(&(im.gid), gid); + sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid); + ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn + << ", " << im.peer_qpn << ", " << gid << dendl; + r = ::write(sd, msg, sizeof(msg)); + // Drop incoming qpt + if (cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; + return -EINVAL; + } + } + + if ((size_t)r != sizeof(msg)) { + // FIXME need to handle EAGAIN instead of retry + if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) { + retry++; + goto retry; + } + if (r < 0) + lderr(cct) << __func__ << " send returned error " << errno << ": " + << cpp_strerror(errno) << dendl; + else + lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl; + return -errno; + } + return 0; +} + +void RDMAConnTCP::wire_gid_to_gid(const char *wgid, union ibv_gid *gid) +{ + char tmp[9]; + uint32_t v32; + int i; + + for (tmp[8] = 0, i = 0; i < 4; ++i) { + memcpy(tmp, wgid + i * 8, 8); + sscanf(tmp, "%x", &v32); + *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32); + } +} + +void RDMAConnTCP::gid_to_wire_gid(const union ibv_gid *gid, char wgid[]) +{ + for (int i = 0; i < 4; ++i) + sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4))); +} + +ostream &RDMAConnTCP::print(ostream &out) const +{ + return out << "TCP {tcp_fd: " << tcp_fd << "}"; +} diff --git a/src/msg/async/rdma/RDMAConnTCP.h b/src/msg/async/rdma/RDMAConnTCP.h new file mode 100644 index 0000000000000..39d96d5de13b7 --- /dev/null +++ b/src/msg/async/rdma/RDMAConnTCP.h @@ -0,0 +1,84 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_RDMA_CONNECTED_SOCKET_TCP_H +#define CEPH_MSG_RDMA_CONNECTED_SOCKET_TCP_H + +#include "common/ceph_context.h" +#include "common/debug.h" +#include "common/errno.h" +#include "msg/async/Stack.h" +#include "Infiniband.h" +#include "RDMAConnectedSocketImpl.h" + +class RDMAWorker; +class RDMADispatcher; + +class RDMAConnTCP : public RDMAConnMgr { + class C_handle_connection : public EventCallback { + RDMAConnTCP *cst; + bool active; + public: + C_handle_connection(RDMAConnTCP *w): cst(w), active(true) {}; + void do_request(int fd) { + if (active) + cst->handle_connection(); + }; + void close() { + active = false; + }; + }; + + IBSYNMsg peer_msg; + IBSYNMsg my_msg; + EventCallbackRef con_handler; + int tcp_fd = -1; + + private: + void handle_connection(); + int send_msg(CephContext *cct, int sd, IBSYNMsg& msg); + int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg); + int activate(); + void wire_gid_to_gid(const char *wgid, union ibv_gid *gid); + void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]); + + public: + RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock, + Infiniband* ib, RDMADispatcher* s, RDMAWorker *w); + virtual ~RDMAConnTCP(); + + virtual ostream &print(ostream &out) const override; + + void set_accept_fd(int sd); + + virtual void cleanup() override; + virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override; +}; + +class RDMAServerConnTCP : public RDMAServerSocketImpl { + NetHandler net; + int server_setup_socket; + + public: + RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a); + + int listen(entity_addr_t &sa, const SocketOptions &opt); + virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; + virtual void abort_accept() override; + virtual int fd() const override { return server_setup_socket; } +}; + +#endif diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index 718b7a25c125b..80f3c014adb0d 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -16,48 +16,85 @@ #include "RDMAStack.h" #include "Device.h" +#include "RDMAConnectedSocketImpl.h" +#include "RDMAConnTCP.h" #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix *_dout << " RDMAConnectedSocketImpl " +RDMAConnMgr::RDMAConnMgr(CephContext *cct, RDMAConnectedSocketImpl *sock, + Infiniband* ib, RDMADispatcher* s, RDMAWorker *w) + : cct(cct), socket(sock), infiniband(ib), dispatcher(s), worker(w), + is_server(false), active(false), + connected(0) +{ +} + RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, RDMAWorker *w) - : cct(cct), connected(0), error(0), infiniband(ib), - dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"), - is_server(false), con_handler(new C_handle_connection(this)), - active(false) + : cct(cct), infiniband(ib), dispatcher(s), worker(w), + error(0), lock("RDMAConnectedSocketImpl::lock") +{ + cmgr = new RDMAConnTCP(cct, this, ib, s, w); +} + +QueuePair *RDMAConnectedSocketImpl::create_queue_pair(Device *d, int p) { - ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str()); - ibport = cct->_conf->ms_async_rdma_port_num; + ibdev = d; + ibport = p; + + qp = ibdev->create_queue_pair(cct, IBV_QPT_RC); + + local_qpn = qp->get_local_qp_number(); + + return qp; +} + +RDMAConnTCP::RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock, + Infiniband* ib, RDMADispatcher* s, RDMAWorker *w) + : RDMAConnMgr(cct, sock, ib, s, w), con_handler(new C_handle_connection(this)) +{ + Device *ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str()); + int ibport = cct->_conf->ms_async_rdma_port_num; assert(ibdev); assert(ibport > 0); ibdev->init(ibport); - qp = ibdev->create_queue_pair(cct, IBV_QPT_RC); - my_msg.qpn = qp->get_local_qp_number(); + QueuePair *qp = socket->create_queue_pair(ibdev, ibport); + + my_msg.qpn = socket->local_qpn; my_msg.psn = qp->get_initial_psn(); my_msg.lid = ibdev->get_lid(); my_msg.peer_qpn = 0; my_msg.gid = ibdev->get_gid(); + socket->register_qp(qp); +} + +void RDMAConnectedSocketImpl::register_qp(QueuePair *qp) +{ notify_fd = dispatcher->register_qp(qp, this); dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair); dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair); } +RDMAConnTCP::~RDMAConnTCP() +{ + cleanup(); + if (tcp_fd >= 0) + ::close(tcp_fd); +} + RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() { ldout(cct, 20) << __func__ << " destruct." << dendl; - cleanup(); worker->remove_pending_conn(this); - dispatcher->erase_qpn(my_msg.qpn); + dispatcher->erase_qpn(local_qpn); Mutex::Locker l(lock); if (notify_fd >= 0) ::close(notify_fd); - if (tcp_fd >= 0) - ::close(tcp_fd); error = ECONNRESET; int ret = 0; for (unsigned i=0; i < wc.size(); ++i) { @@ -68,6 +105,8 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() ret = ibdev->post_chunk(buffers[i]); assert(ret == 0); } + + delete cmgr; } void RDMAConnectedSocketImpl::pass_wc(std::vector &&v) @@ -88,11 +127,13 @@ void RDMAConnectedSocketImpl::get_wc(std::vector &w) w.swap(wc); } -int RDMAConnectedSocketImpl::activate() +int RDMAConnTCP::activate() { ibv_qp_attr qpa; int r; + socket->remote_qpn = peer_msg.qpn; + // now connect up the qps and switch to RTR memset(&qpa, 0, sizeof(qpa)); qpa.qp_state = IBV_QPS_RTR; @@ -106,15 +147,17 @@ int RDMAConnectedSocketImpl::activate() qpa.ah_attr.grh.hop_limit = 6; qpa.ah_attr.grh.dgid = peer_msg.gid; - qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx(); + qpa.ah_attr.grh.sgid_index = socket->get_device()->get_gid_idx(); qpa.ah_attr.dlid = peer_msg.lid; qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl; qpa.ah_attr.src_path_bits = 0; - qpa.ah_attr.port_num = (uint8_t)ibport; + qpa.ah_attr.port_num = (uint8_t)socket->get_ibport(); ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl; + QueuePair *qp = socket->get_qp(); + r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | @@ -168,15 +211,15 @@ int RDMAConnectedSocketImpl::activate() if (!is_server) { connected = 1; //indicate successfully - ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl; - submit(false); + ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << *qp << dendl; + socket->submit(false); } active = true; return 0; } -int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { +int RDMAConnTCP::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:" << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl; NetHandler net(cct); @@ -197,7 +240,7 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl; net.set_priority(tcp_fd, opts.priority, peer_addr.get_family()); my_msg.peer_qpn = 0; - r = infiniband->send_msg(cct, tcp_fd, my_msg); + r = send_msg(cct, tcp_fd, my_msg); if (r < 0) return r; @@ -205,14 +248,14 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S return 0; } -void RDMAConnectedSocketImpl::handle_connection() { - ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl; - int r = infiniband->recv_msg(cct, tcp_fd, peer_msg); +void RDMAConnTCP::handle_connection() { + ldout(cct, 20) << __func__ << " " << *socket << dendl; + int r = recv_msg(cct, tcp_fd, peer_msg); if (r < 0) { if (r != -EAGAIN) { dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl; - fault(); + socket->fault(); } return; } @@ -225,12 +268,12 @@ void RDMAConnectedSocketImpl::handle_connection() { r = activate(); assert(!r); } - notify(); - r = infiniband->send_msg(cct, tcp_fd, my_msg); + socket->notify(); + r = send_msg(cct, tcp_fd, my_msg); if (r < 0) { ldout(cct, 1) << __func__ << " send client ack failed." << dendl; dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); - fault(); + socket->fault(); } } else { if (peer_msg.peer_qpn == 0) {// syn from client @@ -238,11 +281,11 @@ void RDMAConnectedSocketImpl::handle_connection() { ldout(cct, 10) << __func__ << " server is already active." << dendl; return ; } - r = infiniband->send_msg(cct, tcp_fd, my_msg); + r = send_msg(cct, tcp_fd, my_msg); if (r < 0) { ldout(cct, 1) << __func__ << " server ack failed." << dendl; dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); - fault(); + socket->fault(); return ; } r = activate(); @@ -250,17 +293,28 @@ void RDMAConnectedSocketImpl::handle_connection() { } else { // ack from client connected = 1; cleanup(); - submit(false); - notify(); + socket->submit(false); + socket->notify(); } } } +void RDMAConnMgr::post_read() +{ + if (!is_server || connected) + return; + + ldout(cct, 20) << __func__ << " we do not need last handshake, " << *socket << dendl; + connected = 1; //if so, we don't need the last handshake + cleanup(); + socket->submit(false); +} + ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) { uint64_t i = 0; int r = ::read(notify_fd, &i, sizeof(i)); - ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl; + ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << *qp << " r = " << r << dendl; if (error) return -error; ssize_t read = 0; @@ -272,7 +326,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) if (cqe.empty()) return read == 0 ? -EAGAIN : read; - ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl; + ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << *qp << dendl; for (size_t i = 0; i < cqe.size(); ++i) { ibv_wc* response = &cqe[i]; assert(response->status == IBV_WC_SUCCESS); @@ -282,7 +336,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len); if (response->byte_len == 0) { dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin); - if (connected) { + if (cmgr->connected) { error = ECONNRESET; ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; } @@ -303,12 +357,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) } worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size()); - if (is_server && connected == 0) { - ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl; - connected = 1; //if so, we don't need the last handshake - cleanup(); - submit(false); - } + cmgr->post_read(); if (read == 0 && error) return -error; @@ -389,7 +438,7 @@ ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data) ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) { if (error) { - if (!active) + if (!cmgr->active) return -EPIPE; return -error; } @@ -399,12 +448,12 @@ ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) { Mutex::Locker l(lock); pending_bl.claim_append(bl); - if (!connected) { - ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl; + if (!cmgr->connected) { + ldout(cct, 20) << __func__ << " fake send to upper, QP: " << *qp << dendl; return bytes; } } - ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl; + ldout(cct, 20) << __func__ << " QP: " << *qp << dendl; ssize_t r = submit(more); if (r < 0 && r != -EAGAIN) return r; @@ -508,7 +557,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) { - ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl; + ldout(cct, 20) << __func__ << " QP: " << *qp << " " << tx_buffers[0] << dendl; vector::iterator current_buffer = tx_buffers.begin(); ibv_sge isge[tx_buffers.size()]; uint32_t current_sge = 0; @@ -575,7 +624,7 @@ void RDMAConnectedSocketImpl::fin() { } } -void RDMAConnectedSocketImpl::cleanup() { +void RDMAConnTCP::cleanup() { if (con_handler && tcp_fd >= 0) { (static_cast(con_handler))->close(); worker->center.submit_to(worker->center.get_id(), [this]() { @@ -595,35 +644,32 @@ void RDMAConnectedSocketImpl::notify() assert(ret = sizeof(i)); } -void RDMAConnectedSocketImpl::shutdown() +void RDMAConnMgr::shutdown() { - if (!error) - fin(); - error = ECONNRESET; + if (!socket->error) + socket->fin(); + socket->error = ECONNRESET; active = false; } -void RDMAConnectedSocketImpl::close() +void RDMAConnMgr::close() { - if (!error) - fin(); - error = ECONNRESET; - active = false; + shutdown(); } void RDMAConnectedSocketImpl::fault() { - ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl; + ldout(cct, 1) << __func__ << dendl; /*if (qp) { qp->to_dead(); qp = NULL; }*/ error = ECONNRESET; - connected = 1; + cmgr->connected = 1; notify(); } -void RDMAConnectedSocketImpl::set_accept_fd(int sd) +void RDMAConnTCP::set_accept_fd(int sd) { tcp_fd = sd; is_server = true; diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.h b/src/msg/async/rdma/RDMAConnectedSocketImpl.h index e9b1ef96fb4d3..d48334b2eeb9d 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.h +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.h @@ -25,88 +25,123 @@ class RDMAWorker; class RDMADispatcher; +class RDMAConnectedSocketImpl; + +typedef Infiniband::QueuePair QueuePair; + +class RDMAConnMgr { + friend class RDMAConnectedSocketImpl; + + protected: + CephContext *cct; + RDMAConnectedSocketImpl *socket; + Infiniband* infiniband; + RDMADispatcher* dispatcher; + RDMAWorker* worker; + + bool is_server; + bool active;// qp is active ? + int connected; + + public: + RDMAConnMgr(CephContext *cct, RDMAConnectedSocketImpl *sock, + Infiniband* ib, RDMADispatcher* s, RDMAWorker *w); + virtual ~RDMAConnMgr() { }; + + virtual ostream &print(ostream &out) const = 0; + + virtual void cleanup() = 0; + virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) = 0; + virtual void set_accept_fd(int sd) = 0; + + void post_read(); + + void shutdown(); + void close(); +}; +inline ostream& operator<<(ostream& out, const RDMAConnMgr &m) +{ + return m.print(out); +} class RDMAConnectedSocketImpl : public ConnectedSocketImpl { + friend class RDMAConnMgr; + + protected: + CephContext *cct; + Infiniband* infiniband; + RDMADispatcher* dispatcher; + RDMAWorker* worker; + Device *ibdev = nullptr; + int ibport = -1; + QueuePair *qp = nullptr; + public: typedef Infiniband::MemoryManager::Chunk Chunk; typedef Infiniband::CompletionChannel CompletionChannel; typedef Infiniband::CompletionQueue CompletionQueue; private: - CephContext *cct; - Infiniband::QueuePair *qp; - Device *ibdev; - int ibport; - IBSYNMsg peer_msg; - IBSYNMsg my_msg; - int connected; + RDMAConnMgr *cmgr; int error; - Infiniband* infiniband; - RDMADispatcher* dispatcher; - RDMAWorker* worker; std::vector buffers; int notify_fd = -1; bufferlist pending_bl; Mutex lock; std::vector wc; - bool is_server; - EventCallbackRef con_handler; - int tcp_fd = -1; - bool active;// qp is active ? - void notify(); ssize_t read_buffers(char* buf, size_t len); int post_work_request(std::vector&); public: + uint32_t local_qpn = 0; + uint32_t remote_qpn = 0; + RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, RDMAWorker *w); virtual ~RDMAConnectedSocketImpl(); - Device *get_device() { return ibdev; } + ostream &print(ostream &out) const { + return out << "socket {lqpn: " << local_qpn << " rqpn: " << remote_qpn << " " << *cmgr << "}"; + }; + + Device *get_device() { return ibdev; }; + int get_ibport() { return ibport; }; void pass_wc(std::vector &&v); void get_wc(std::vector &w); - virtual int is_connected() override { return connected; } + virtual int is_connected() override { return cmgr->connected; } virtual ssize_t read(char* buf, size_t len) override; virtual ssize_t zero_copy_read(bufferptr &data) override; virtual ssize_t send(bufferlist &bl, bool more) override; - virtual void shutdown() override; - virtual void close() override; + virtual void shutdown() override { cmgr->shutdown(); }; + virtual void close() override { cmgr->close(); }; virtual int fd() const override { return notify_fd; } void fault(); const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); } + QueuePair *get_qp() { return qp; }; ssize_t submit(bool more); - int activate(); void fin(); - void handle_connection(); - void cleanup(); - void set_accept_fd(int sd); - int try_connect(const entity_addr_t&, const SocketOptions &opt); - - class C_handle_connection : public EventCallback { - RDMAConnectedSocketImpl *csi; - bool active; - public: - C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {} - void do_request(int fd) { - if (active) - csi->handle_connection(); - } - void close() { - active = false; - } - }; + void register_qp(QueuePair *qp); + void notify(); + + QueuePair *create_queue_pair(Device *d, int p); + void set_accept_fd(int sd) {cmgr->set_accept_fd(sd); }; + int try_connect(const entity_addr_t &sa, const SocketOptions &opt) { return cmgr->try_connect(sa, opt); }; }; +inline ostream& operator<<(ostream& out, const RDMAConnectedSocketImpl &s) +{ + return s.print(out); +} + class RDMAServerSocketImpl : public ServerSocketImpl { + protected: CephContext *cct; Device *ibdev; int ibport; - NetHandler net; - int server_setup_socket; Infiniband* infiniband; RDMADispatcher *dispatcher; RDMAWorker *worker; @@ -115,11 +150,10 @@ class RDMAServerSocketImpl : public ServerSocketImpl { public: RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a); - int listen(entity_addr_t &sa, const SocketOptions &opt); - virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; - virtual void abort_accept() override; - virtual int fd() const override { return server_setup_socket; } - int get_fd() { return server_setup_socket; } + virtual int listen(entity_addr_t &sa, const SocketOptions &opt) = 0; + virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) = 0; + virtual void abort_accept() = 0; + virtual int fd() const = 0; }; #endif diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc index 93a22532796d1..f2b6b79b4233e 100644 --- a/src/msg/async/rdma/RDMAServerSocketImpl.cc +++ b/src/msg/async/rdma/RDMAServerSocketImpl.cc @@ -17,13 +17,19 @@ #include "msg/async/net_handler.h" #include "RDMAStack.h" #include "Device.h" +#include "RDMAConnTCP.h" #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix *_dout << " RDMAServerSocketImpl " RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a) - : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a) + : cct(cct), infiniband(i), dispatcher(s), worker(w), sa(a) +{ +} + +RDMAServerConnTCP::RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a) + : RDMAServerSocketImpl(cct, i, s, w, a), net(cct), server_setup_socket(-1) { ibdev = infiniband->get_device(cct->_conf->ms_async_rdma_device_name.c_str()); ibport = cct->_conf->ms_async_rdma_port_num; @@ -34,7 +40,7 @@ RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMA ibdev->init(ibport); } -int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt) +int RDMAServerConnTCP::listen(entity_addr_t &sa, const SocketOptions &opt) { int rc = 0; server_setup_socket = net.create_socket(sa.get_family(), true); @@ -80,7 +86,7 @@ err: return -errno; } -int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) +int RDMAServerConnTCP::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) { ldout(cct, 15) << __func__ << dendl; @@ -106,7 +112,7 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt } net.set_priority(sd, opt.priority, out->get_family()); - RDMAConnectedSocketImpl* server; + RDMAConnectedSocketImpl *server; //Worker* w = dispatcher->get_stack()->get_worker(); server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast(w)); server->set_accept_fd(sd); @@ -119,7 +125,7 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt return 0; } -void RDMAServerSocketImpl::abort_accept() +void RDMAServerConnTCP::abort_accept() { if (server_setup_socket >= 0) ::close(server_setup_socket); diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index dc856b965e8e3..b4e7fab3483ab 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -21,6 +21,7 @@ #include "common/deleter.h" #include "common/Tub.h" #include "RDMAStack.h" +#include "RDMAConnTCP.h" #include "Device.h" #define dout_subsys ceph_subsys_ms @@ -410,7 +411,7 @@ void RDMAWorker::initialize() int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock) { - auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa); + auto p = new RDMAServerConnTCP(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa); int r = p->listen(sa, opt); if (r < 0) { delete p; -- 2.39.5