From: Amir Vadai Date: Tue, 23 May 2017 07:33:45 +0000 (+0300) Subject: Revert "msg/async/rdma: Introduce RDMAConnMgr" X-Git-Tag: ses5-milestone6~9^2~21^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a0e06932363d567469971c91baf2867d31c2e059;p=ceph.git Revert "msg/async/rdma: Introduce RDMAConnMgr" This reverts commit 91bb13dceddb433966e6f775716e53ec5f523f5e. Change-Id: I91dbe3c02651402f0e97e89151ec6a5154afdca3 Issue: 995322 Signed-off-by: Amir Vadai --- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ad658e722b5e..e9f86e31cb57 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -364,7 +364,6 @@ if(HAVE_RDMA) msg/async/rdma/Infiniband.cc msg/async/rdma/Device.cc msg/async/rdma/RDMAConnectedSocketImpl.cc - msg/async/rdma/RDMAConnTCP.cc msg/async/rdma/RDMAServerSocketImpl.cc msg/async/rdma/RDMAStack.cc) endif(HAVE_RDMA) diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index de9d6e21b9a0..020d1be47b6e 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -26,6 +26,7 @@ #define dout_prefix *_dout << "Infiniband " static const uint32_t MAX_INLINE_DATA = 0; +static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000"); Infiniband::QueuePair::QueuePair( CephContext *c, Device &device, ibv_qp_type type, @@ -609,6 +610,93 @@ Device* Infiniband::get_device(const char* device_name) return device_list->get_device(device_name); } +// 1 means no valid buffer read, 0 means got enough buffer +// else return < 0 means error +int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im) +{ + char msg[TCP_MSG_LEN]; + char gid[33]; + ssize_t r = ::read(sd, &msg, sizeof(msg)); + // Drop incoming qpt + if (cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; + return -EINVAL; + } + } + if (r < 0) { + r = -errno; + lderr(cct) << __func__ << " got error " << r << ": " + << cpp_strerror(r) << dendl; + } else if (r == 0) { // valid disconnect message of length 0 + ldout(cct, 10) << __func__ << " got disconnect message " << dendl; + } else if ((size_t)r != sizeof(msg)) { // invalid message + ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl; + r = -EINVAL; + } else { // valid message + sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid); + wire_gid_to_gid(gid, &(im.gid)); + ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl; + } + return r; +} + +int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im) +{ + int retry = 0; + ssize_t r; + + char msg[TCP_MSG_LEN]; + char gid[33]; +retry: + gid_to_wire_gid(&(im.gid), gid); + sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid); + ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn + << ", " << im.peer_qpn << ", " << gid << dendl; + r = ::write(sd, msg, sizeof(msg)); + // Drop incoming qpt + if (cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; + return -EINVAL; + } + } + + if ((size_t)r != sizeof(msg)) { + // FIXME need to handle EAGAIN instead of retry + if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) { + retry++; + goto retry; + } + if (r < 0) + lderr(cct) << __func__ << " send returned error " << errno << ": " + << cpp_strerror(errno) << dendl; + else + lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl; + return -errno; + } + return 0; +} + +void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid) +{ + char tmp[9]; + uint32_t v32; + int i; + + for (tmp[8] = 0, i = 0; i < 4; ++i) { + memcpy(tmp, wgid + i * 8, 8); + sscanf(tmp, "%x", &v32); + *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32); + } +} + +void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[]) +{ + for (int i = 0; i < 4; ++i) + sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4))); +} + Infiniband::QueuePair::~QueuePair() { if (qp) { diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index 98b710a0385d..2ac392738e4e 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -144,6 +144,9 @@ class Infiniband { DeviceList *device_list; RDMADispatcher *dispatcher = nullptr; + void wire_gid_to_gid(const char *wgid, union ibv_gid *gid); + void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]); + public: explicit Infiniband(CephContext *c); ~Infiniband(); @@ -265,6 +268,8 @@ class Infiniband { }; public: + int send_msg(CephContext *cct, int sd, IBSYNMsg& msg); + int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg); static const char* wc_status_to_string(int status); static const char* qp_state_string(int status); @@ -280,9 +285,4 @@ class Infiniband { RDMADispatcher *get_dispatcher() { return dispatcher; } }; -inline ostream& operator<<(ostream& out, const Infiniband::QueuePair &qp) -{ - return out << qp.get_local_qp_number(); -} - #endif diff --git a/src/msg/async/rdma/RDMAConnTCP.cc b/src/msg/async/rdma/RDMAConnTCP.cc deleted file mode 100644 index f2c3c9a68b29..000000000000 --- a/src/msg/async/rdma/RDMAConnTCP.cc +++ /dev/null @@ -1,117 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 XSKY - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "RDMAStack.h" -#include "Device.h" -#include "RDMAConnTCP.h" - -#define dout_subsys ceph_subsys_ms -#undef dout_prefix -#define dout_prefix *_dout << " RDMAConnTCP " - -static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000"); - -// 1 means no valid buffer read, 0 means got enough buffer -// else return < 0 means error -int RDMAConnTCP::recv_msg(CephContext *cct, int sd, IBSYNMsg& im) -{ - char msg[TCP_MSG_LEN]; - char gid[33]; - ssize_t r = ::read(sd, &msg, sizeof(msg)); - // Drop incoming qpt - if (cct->_conf->ms_inject_socket_failures && sd >= 0) { - if (rand() % cct->_conf->ms_inject_socket_failures == 0) { - ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; - return -EINVAL; - } - } - if (r < 0) { - r = -errno; - lderr(cct) << __func__ << " got error " << r << ": " - << cpp_strerror(r) << dendl; - } else if (r == 0) { // valid disconnect message of length 0 - ldout(cct, 10) << __func__ << " got disconnect message " << dendl; - } else if ((size_t)r != sizeof(msg)) { // invalid message - ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl; - r = -EINVAL; - } else { // valid message - sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid); - wire_gid_to_gid(gid, &(im.gid)); - ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl; - } - return r; -} - -int RDMAConnTCP::send_msg(CephContext *cct, int sd, IBSYNMsg& im) -{ - int retry = 0; - ssize_t r; - - char msg[TCP_MSG_LEN]; - char gid[33]; -retry: - gid_to_wire_gid(&(im.gid), gid); - sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid); - ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn - << ", " << im.peer_qpn << ", " << gid << dendl; - r = ::write(sd, msg, sizeof(msg)); - // Drop incoming qpt - if (cct->_conf->ms_inject_socket_failures && sd >= 0) { - if (rand() % cct->_conf->ms_inject_socket_failures == 0) { - ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; - return -EINVAL; - } - } - - if ((size_t)r != sizeof(msg)) { - // FIXME need to handle EAGAIN instead of retry - if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) { - retry++; - goto retry; - } - if (r < 0) - lderr(cct) << __func__ << " send returned error " << errno << ": " - << cpp_strerror(errno) << dendl; - else - lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl; - return -errno; - } - return 0; -} - -void RDMAConnTCP::wire_gid_to_gid(const char *wgid, union ibv_gid *gid) -{ - char tmp[9]; - uint32_t v32; - int i; - - for (tmp[8] = 0, i = 0; i < 4; ++i) { - memcpy(tmp, wgid + i * 8, 8); - sscanf(tmp, "%x", &v32); - *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32); - } -} - -void RDMAConnTCP::gid_to_wire_gid(const union ibv_gid *gid, char wgid[]) -{ - for (int i = 0; i < 4; ++i) - sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4))); -} - -ostream &RDMAConnTCP::print(ostream &out) const -{ - return out << "TCP {tcp_fd: " << tcp_fd << "}"; -} diff --git a/src/msg/async/rdma/RDMAConnTCP.h b/src/msg/async/rdma/RDMAConnTCP.h deleted file mode 100644 index 39d96d5de13b..000000000000 --- a/src/msg/async/rdma/RDMAConnTCP.h +++ /dev/null @@ -1,84 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 XSKY - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_MSG_RDMA_CONNECTED_SOCKET_TCP_H -#define CEPH_MSG_RDMA_CONNECTED_SOCKET_TCP_H - -#include "common/ceph_context.h" -#include "common/debug.h" -#include "common/errno.h" -#include "msg/async/Stack.h" -#include "Infiniband.h" -#include "RDMAConnectedSocketImpl.h" - -class RDMAWorker; -class RDMADispatcher; - -class RDMAConnTCP : public RDMAConnMgr { - class C_handle_connection : public EventCallback { - RDMAConnTCP *cst; - bool active; - public: - C_handle_connection(RDMAConnTCP *w): cst(w), active(true) {}; - void do_request(int fd) { - if (active) - cst->handle_connection(); - }; - void close() { - active = false; - }; - }; - - IBSYNMsg peer_msg; - IBSYNMsg my_msg; - EventCallbackRef con_handler; - int tcp_fd = -1; - - private: - void handle_connection(); - int send_msg(CephContext *cct, int sd, IBSYNMsg& msg); - int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg); - int activate(); - void wire_gid_to_gid(const char *wgid, union ibv_gid *gid); - void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]); - - public: - RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock, - Infiniband* ib, RDMADispatcher* s, RDMAWorker *w); - virtual ~RDMAConnTCP(); - - virtual ostream &print(ostream &out) const override; - - void set_accept_fd(int sd); - - virtual void cleanup() override; - virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override; -}; - -class RDMAServerConnTCP : public RDMAServerSocketImpl { - NetHandler net; - int server_setup_socket; - - public: - RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a); - - int listen(entity_addr_t &sa, const SocketOptions &opt); - virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; - virtual void abort_accept() override; - virtual int fd() const override { return server_setup_socket; } -}; - -#endif diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index ca7ac180d53a..e7066963de9a 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -16,85 +16,48 @@ #include "RDMAStack.h" #include "Device.h" -#include "RDMAConnectedSocketImpl.h" -#include "RDMAConnTCP.h" #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix *_dout << " RDMAConnectedSocketImpl " -RDMAConnMgr::RDMAConnMgr(CephContext *cct, RDMAConnectedSocketImpl *sock, - Infiniband* ib, RDMADispatcher* s, RDMAWorker *w) - : cct(cct), socket(sock), infiniband(ib), dispatcher(s), worker(w), - is_server(false), active(false), - connected(0) -{ -} - RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, RDMAWorker *w) - : cct(cct), infiniband(ib), dispatcher(s), worker(w), - error(0), lock("RDMAConnectedSocketImpl::lock") -{ - cmgr = new RDMAConnTCP(cct, this, ib, s, w); -} - -QueuePair *RDMAConnectedSocketImpl::create_queue_pair(Device *d, int p) + : cct(cct), connected(0), error(0), infiniband(ib), + dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"), + is_server(false), con_handler(new C_handle_connection(this)), + active(false) { - ibdev = d; - ibport = p; - - qp = ibdev->create_queue_pair(cct, IBV_QPT_RC); - - local_qpn = qp->get_local_qp_number(); - - return qp; -} - -RDMAConnTCP::RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock, - Infiniband* ib, RDMADispatcher* s, RDMAWorker *w) - : RDMAConnMgr(cct, sock, ib, s, w), con_handler(new C_handle_connection(this)) -{ - Device *ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str()); - int ibport = cct->_conf->ms_async_rdma_port_num; + ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str()); + ibport = cct->_conf->ms_async_rdma_port_num; assert(ibdev); assert(ibport > 0); ibdev->init(ibport); - QueuePair *qp = socket->create_queue_pair(ibdev, ibport); - - my_msg.qpn = socket->local_qpn; + qp = ibdev->create_queue_pair(cct, IBV_QPT_RC); + my_msg.qpn = qp->get_local_qp_number(); my_msg.psn = qp->get_initial_psn(); my_msg.lid = ibdev->get_lid(); my_msg.peer_qpn = 0; my_msg.gid = ibdev->get_gid(); - socket->register_qp(qp); -} - -void RDMAConnectedSocketImpl::register_qp(QueuePair *qp) -{ notify_fd = dispatcher->register_qp(qp, this); dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair); dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair); } -RDMAConnTCP::~RDMAConnTCP() -{ - cleanup(); - if (tcp_fd >= 0) - ::close(tcp_fd); -} - RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() { ldout(cct, 20) << __func__ << " destruct." << dendl; + cleanup(); worker->remove_pending_conn(this); - dispatcher->erase_qpn(local_qpn); + dispatcher->erase_qpn(my_msg.qpn); Mutex::Locker l(lock); if (notify_fd >= 0) ::close(notify_fd); + if (tcp_fd >= 0) + ::close(tcp_fd); error = ECONNRESET; int ret = 0; for (unsigned i=0; i < wc.size(); ++i) { @@ -107,8 +70,6 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() assert(ret == 0); dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); } - - delete cmgr; } void RDMAConnectedSocketImpl::pass_wc(std::vector &&v) @@ -129,13 +90,11 @@ void RDMAConnectedSocketImpl::get_wc(std::vector &w) w.swap(wc); } -int RDMAConnTCP::activate() +int RDMAConnectedSocketImpl::activate() { ibv_qp_attr qpa; int r; - socket->remote_qpn = peer_msg.qpn; - // now connect up the qps and switch to RTR memset(&qpa, 0, sizeof(qpa)); qpa.qp_state = IBV_QPS_RTR; @@ -149,17 +108,15 @@ int RDMAConnTCP::activate() qpa.ah_attr.grh.hop_limit = 6; qpa.ah_attr.grh.dgid = peer_msg.gid; - qpa.ah_attr.grh.sgid_index = socket->get_device()->get_gid_idx(); + qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx(); qpa.ah_attr.dlid = peer_msg.lid; qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl; qpa.ah_attr.src_path_bits = 0; - qpa.ah_attr.port_num = (uint8_t)socket->get_ibport(); + qpa.ah_attr.port_num = (uint8_t)ibport; ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl; - QueuePair *qp = socket->get_qp(); - r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | @@ -213,15 +170,15 @@ int RDMAConnTCP::activate() if (!is_server) { connected = 1; //indicate successfully - ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << *qp << dendl; - socket->submit(false); + ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl; + submit(false); } active = true; return 0; } -int RDMAConnTCP::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { +int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:" << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl; NetHandler net(cct); @@ -242,7 +199,7 @@ int RDMAConnTCP::try_connect(const entity_addr_t& peer_addr, const SocketOptions ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl; net.set_priority(tcp_fd, opts.priority, peer_addr.get_family()); my_msg.peer_qpn = 0; - r = send_msg(cct, tcp_fd, my_msg); + r = infiniband->send_msg(cct, tcp_fd, my_msg); if (r < 0) return r; @@ -250,14 +207,14 @@ int RDMAConnTCP::try_connect(const entity_addr_t& peer_addr, const SocketOptions return 0; } -void RDMAConnTCP::handle_connection() { - ldout(cct, 20) << __func__ << " " << *socket << dendl; - int r = recv_msg(cct, tcp_fd, peer_msg); +void RDMAConnectedSocketImpl::handle_connection() { + ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl; + int r = infiniband->recv_msg(cct, tcp_fd, peer_msg); if (r < 0) { if (r != -EAGAIN) { dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl; - socket->fault(); + fault(); } return; } @@ -270,12 +227,12 @@ void RDMAConnTCP::handle_connection() { r = activate(); assert(!r); } - socket->notify(); - r = send_msg(cct, tcp_fd, my_msg); + notify(); + r = infiniband->send_msg(cct, tcp_fd, my_msg); if (r < 0) { ldout(cct, 1) << __func__ << " send client ack failed." << dendl; dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); - socket->fault(); + fault(); } } else { if (peer_msg.peer_qpn == 0) {// syn from client @@ -283,11 +240,11 @@ void RDMAConnTCP::handle_connection() { ldout(cct, 10) << __func__ << " server is already active." << dendl; return ; } - r = send_msg(cct, tcp_fd, my_msg); + r = infiniband->send_msg(cct, tcp_fd, my_msg); if (r < 0) { ldout(cct, 1) << __func__ << " server ack failed." << dendl; dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); - socket->fault(); + fault(); return ; } r = activate(); @@ -295,28 +252,17 @@ void RDMAConnTCP::handle_connection() { } else { // ack from client connected = 1; cleanup(); - socket->submit(false); - socket->notify(); + submit(false); + notify(); } } } -void RDMAConnMgr::post_read() -{ - if (!is_server || connected) - return; - - ldout(cct, 20) << __func__ << " we do not need last handshake, " << *socket << dendl; - connected = 1; //if so, we don't need the last handshake - cleanup(); - socket->submit(false); -} - ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) { uint64_t i = 0; int r = ::read(notify_fd, &i, sizeof(i)); - ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << *qp << " r = " << r << dendl; + ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl; if (error) return -error; ssize_t read = 0; @@ -328,7 +274,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) if (cqe.empty()) return read == 0 ? -EAGAIN : read; - ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << *qp << dendl; + ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl; for (size_t i = 0; i < cqe.size(); ++i) { ibv_wc* response = &cqe[i]; assert(response->status == IBV_WC_SUCCESS); @@ -338,7 +284,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len); if (response->byte_len == 0) { dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin); - if (cmgr->connected) { + if (connected) { error = ECONNRESET; ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; } @@ -361,7 +307,12 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) } worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size()); - cmgr->post_read(); + if (is_server && connected == 0) { + ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl; + connected = 1; //if so, we don't need the last handshake + cleanup(); + submit(false); + } if (read == 0 && error) return -error; @@ -443,7 +394,7 @@ ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data) ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) { if (error) { - if (!cmgr->active) + if (!active) return -EPIPE; return -error; } @@ -453,12 +404,12 @@ ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) { Mutex::Locker l(lock); pending_bl.claim_append(bl); - if (!cmgr->connected) { - ldout(cct, 20) << __func__ << " fake send to upper, QP: " << *qp << dendl; + if (!connected) { + ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl; return bytes; } } - ldout(cct, 20) << __func__ << " QP: " << *qp << dendl; + ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl; ssize_t r = submit(more); if (r < 0 && r != -EAGAIN) return r; @@ -562,7 +513,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) { - ldout(cct, 20) << __func__ << " QP: " << *qp << " " << tx_buffers[0] << dendl; + ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl; vector::iterator current_buffer = tx_buffers.begin(); ibv_sge isge[tx_buffers.size()]; uint32_t current_sge = 0; @@ -629,7 +580,7 @@ void RDMAConnectedSocketImpl::fin() { } } -void RDMAConnTCP::cleanup() { +void RDMAConnectedSocketImpl::cleanup() { if (con_handler && tcp_fd >= 0) { (static_cast(con_handler))->close(); worker->center.submit_to(worker->center.get_id(), [this]() { @@ -649,32 +600,35 @@ void RDMAConnectedSocketImpl::notify() assert(ret = sizeof(i)); } -void RDMAConnMgr::shutdown() +void RDMAConnectedSocketImpl::shutdown() { - if (!socket->error) - socket->fin(); - socket->error = ECONNRESET; + if (!error) + fin(); + error = ECONNRESET; active = false; } -void RDMAConnMgr::close() +void RDMAConnectedSocketImpl::close() { - shutdown(); + if (!error) + fin(); + error = ECONNRESET; + active = false; } void RDMAConnectedSocketImpl::fault() { - ldout(cct, 1) << __func__ << dendl; + ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl; /*if (qp) { qp->to_dead(); qp = NULL; }*/ error = ECONNRESET; - cmgr->connected = 1; + connected = 1; notify(); } -void RDMAConnTCP::set_accept_fd(int sd) +void RDMAConnectedSocketImpl::set_accept_fd(int sd) { tcp_fd = sd; is_server = true; diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.h b/src/msg/async/rdma/RDMAConnectedSocketImpl.h index d48334b2eeb9..e9b1ef96fb4d 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.h +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.h @@ -25,123 +25,88 @@ class RDMAWorker; class RDMADispatcher; -class RDMAConnectedSocketImpl; - -typedef Infiniband::QueuePair QueuePair; - -class RDMAConnMgr { - friend class RDMAConnectedSocketImpl; - - protected: - CephContext *cct; - RDMAConnectedSocketImpl *socket; - Infiniband* infiniband; - RDMADispatcher* dispatcher; - RDMAWorker* worker; - - bool is_server; - bool active;// qp is active ? - int connected; - - public: - RDMAConnMgr(CephContext *cct, RDMAConnectedSocketImpl *sock, - Infiniband* ib, RDMADispatcher* s, RDMAWorker *w); - virtual ~RDMAConnMgr() { }; - - virtual ostream &print(ostream &out) const = 0; - - virtual void cleanup() = 0; - virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) = 0; - virtual void set_accept_fd(int sd) = 0; - - void post_read(); - - void shutdown(); - void close(); -}; -inline ostream& operator<<(ostream& out, const RDMAConnMgr &m) -{ - return m.print(out); -} class RDMAConnectedSocketImpl : public ConnectedSocketImpl { - friend class RDMAConnMgr; - - protected: - CephContext *cct; - Infiniband* infiniband; - RDMADispatcher* dispatcher; - RDMAWorker* worker; - Device *ibdev = nullptr; - int ibport = -1; - QueuePair *qp = nullptr; - public: typedef Infiniband::MemoryManager::Chunk Chunk; typedef Infiniband::CompletionChannel CompletionChannel; typedef Infiniband::CompletionQueue CompletionQueue; private: - RDMAConnMgr *cmgr; + CephContext *cct; + Infiniband::QueuePair *qp; + Device *ibdev; + int ibport; + IBSYNMsg peer_msg; + IBSYNMsg my_msg; + int connected; int error; + Infiniband* infiniband; + RDMADispatcher* dispatcher; + RDMAWorker* worker; std::vector buffers; int notify_fd = -1; bufferlist pending_bl; Mutex lock; std::vector wc; + bool is_server; + EventCallbackRef con_handler; + int tcp_fd = -1; + bool active;// qp is active ? + void notify(); ssize_t read_buffers(char* buf, size_t len); int post_work_request(std::vector&); public: - uint32_t local_qpn = 0; - uint32_t remote_qpn = 0; - RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, RDMAWorker *w); virtual ~RDMAConnectedSocketImpl(); - ostream &print(ostream &out) const { - return out << "socket {lqpn: " << local_qpn << " rqpn: " << remote_qpn << " " << *cmgr << "}"; - }; - - Device *get_device() { return ibdev; }; - int get_ibport() { return ibport; }; + Device *get_device() { return ibdev; } void pass_wc(std::vector &&v); void get_wc(std::vector &w); - virtual int is_connected() override { return cmgr->connected; } + virtual int is_connected() override { return connected; } virtual ssize_t read(char* buf, size_t len) override; virtual ssize_t zero_copy_read(bufferptr &data) override; virtual ssize_t send(bufferlist &bl, bool more) override; - virtual void shutdown() override { cmgr->shutdown(); }; - virtual void close() override { cmgr->close(); }; + virtual void shutdown() override; + virtual void close() override; virtual int fd() const override { return notify_fd; } void fault(); const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); } - QueuePair *get_qp() { return qp; }; ssize_t submit(bool more); + int activate(); void fin(); - void register_qp(QueuePair *qp); - void notify(); - - QueuePair *create_queue_pair(Device *d, int p); - void set_accept_fd(int sd) {cmgr->set_accept_fd(sd); }; - int try_connect(const entity_addr_t &sa, const SocketOptions &opt) { return cmgr->try_connect(sa, opt); }; + void handle_connection(); + void cleanup(); + void set_accept_fd(int sd); + int try_connect(const entity_addr_t&, const SocketOptions &opt); + + class C_handle_connection : public EventCallback { + RDMAConnectedSocketImpl *csi; + bool active; + public: + C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {} + void do_request(int fd) { + if (active) + csi->handle_connection(); + } + void close() { + active = false; + } + }; }; -inline ostream& operator<<(ostream& out, const RDMAConnectedSocketImpl &s) -{ - return s.print(out); -} - class RDMAServerSocketImpl : public ServerSocketImpl { - protected: CephContext *cct; Device *ibdev; int ibport; + NetHandler net; + int server_setup_socket; Infiniband* infiniband; RDMADispatcher *dispatcher; RDMAWorker *worker; @@ -150,10 +115,11 @@ class RDMAServerSocketImpl : public ServerSocketImpl { public: RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a); - virtual int listen(entity_addr_t &sa, const SocketOptions &opt) = 0; - virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) = 0; - virtual void abort_accept() = 0; - virtual int fd() const = 0; + int listen(entity_addr_t &sa, const SocketOptions &opt); + virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; + virtual void abort_accept() override; + virtual int fd() const override { return server_setup_socket; } + int get_fd() { return server_setup_socket; } }; #endif diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc index d7065baeb12b..43536d4f3b59 100644 --- a/src/msg/async/rdma/RDMAServerSocketImpl.cc +++ b/src/msg/async/rdma/RDMAServerSocketImpl.cc @@ -17,19 +17,13 @@ #include "msg/async/net_handler.h" #include "RDMAStack.h" #include "Device.h" -#include "RDMAConnTCP.h" #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix *_dout << " RDMAServerSocketImpl " RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a) - : cct(cct), infiniband(i), dispatcher(s), worker(w), sa(a) -{ -} - -RDMAServerConnTCP::RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a) - : RDMAServerSocketImpl(cct, i, s, w, a), net(cct), server_setup_socket(-1) + : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a) { ibdev = infiniband->get_device(cct->_conf->ms_async_rdma_device_name.c_str()); ibport = cct->_conf->ms_async_rdma_port_num; @@ -40,7 +34,7 @@ RDMAServerConnTCP::RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispat ibdev->init(ibport); } -int RDMAServerConnTCP::listen(entity_addr_t &sa, const SocketOptions &opt) +int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt) { int rc = 0; server_setup_socket = net.create_socket(sa.get_family(), true); @@ -86,7 +80,7 @@ err: return -errno; } -int RDMAServerConnTCP::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) +int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) { ldout(cct, 15) << __func__ << dendl; @@ -116,7 +110,7 @@ int RDMAServerConnTCP::accept(ConnectedSocket *sock, const SocketOptions &opt, e out->set_sockaddr((sockaddr*)&ss); net.set_priority(sd, opt.priority, out->get_family()); - RDMAConnectedSocketImpl *server; + RDMAConnectedSocketImpl* server; //Worker* w = dispatcher->get_stack()->get_worker(); server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast(w)); server->set_accept_fd(sd); @@ -127,7 +121,7 @@ int RDMAServerConnTCP::accept(ConnectedSocket *sock, const SocketOptions &opt, e return 0; } -void RDMAServerConnTCP::abort_accept() +void RDMAServerSocketImpl::abort_accept() { if (server_setup_socket >= 0) ::close(server_setup_socket); diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index 03d7c1f1c3e2..ad49b7d2b79e 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -21,7 +21,6 @@ #include "common/deleter.h" #include "common/Tub.h" #include "RDMAStack.h" -#include "RDMAConnTCP.h" #include "Device.h" #define dout_subsys ceph_subsys_ms @@ -414,7 +413,7 @@ void RDMAWorker::initialize() int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock) { - auto p = new RDMAServerConnTCP(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa); + auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa); int r = p->listen(sa, opt); if (r < 0) { delete p;