]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async/rdma: Introduce RDMAConnMgr
authorAmir Vadai <amir@vadai.me>
Wed, 22 Mar 2017 07:05:21 +0000 (09:05 +0200)
committerAdir Lev <adirl@mellanox.com>
Wed, 29 Mar 2017 14:18:16 +0000 (17:18 +0300)
Encapsulate all connection establishment stuff in a new class -
RDMAConnMgr and make it a friend class of RDMAConnectedSocketImpl.
This class will be inherited for every type of connection establishment
- Currently only TCP is supported, very soon CM will be added too.

RDMAServerConnImpl which only handle connection establishment became an
abstract class and RDMAServerConnTCP is inherting it for connections of
type TCP.

Some of the code was left in its original file and place, and therefore
it looks misplaced. This was done to make it easier to review and rebase.
Once it is accepted a cleanup patch will be sent to move the code into
the right place.

Issue: 995322
Change-Id: I8b0e163525ec80c2452f4b6481bf696968cc1e51
Signed-off-by: Amir Vadai <amir@vadai.me>
src/CMakeLists.txt
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnTCP.cc [new file with mode: 0644]
src/msg/async/rdma/RDMAConnTCP.h [new file with mode: 0644]
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAConnectedSocketImpl.h
src/msg/async/rdma/RDMAServerSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc

index 179598a332463216d448d81437c17af028f71b47..c578aff56c73e6e5276721a421bb8c9d30fb0e42 100644 (file)
@@ -349,6 +349,7 @@ if(HAVE_RDMA)
     msg/async/rdma/Infiniband.cc
     msg/async/rdma/Device.cc
     msg/async/rdma/RDMAConnectedSocketImpl.cc
+    msg/async/rdma/RDMAConnTCP.cc
     msg/async/rdma/RDMAServerSocketImpl.cc
     msg/async/rdma/RDMAStack.cc)
 endif(HAVE_RDMA)
index 74f5c0436abd3d2088d49bf9616da90eb6628425..0a87b04ed5ac9acea420de1953516f3d46fdf6ee 100644 (file)
@@ -26,7 +26,6 @@
 #define dout_prefix *_dout << "Infiniband "
 
 static const uint32_t MAX_INLINE_DATA = 0;
-static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
 
 Infiniband::QueuePair::QueuePair(
     CephContext *c, Device &device, ibv_qp_type type,
@@ -613,93 +612,6 @@ Device* Infiniband::get_device(const char* device_name)
   return device_list->get_device(device_name);
 }
 
-// 1 means no valid buffer read, 0 means got enough buffer
-// else return < 0 means error
-int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
-{
-  char msg[TCP_MSG_LEN];
-  char gid[33];
-  ssize_t r = ::read(sd, &msg, sizeof(msg));
-  // Drop incoming qpt
-  if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
-    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
-      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
-      return -EINVAL;
-    }
-  }
-  if (r < 0) {
-    r = -errno;
-    lderr(cct) << __func__ << " got error " << r << ": "
-               << cpp_strerror(r) << dendl;
-  } else if (r == 0) { // valid disconnect message of length 0
-    ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
-  } else if ((size_t)r != sizeof(msg)) { // invalid message
-    ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
-    r = -EINVAL;
-  } else { // valid message
-    sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
-    wire_gid_to_gid(gid, &(im.gid));
-    ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid  << dendl;
-  }
-  return r;
-}
-
-int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
-{
-  int retry = 0;
-  ssize_t r;
-
-  char msg[TCP_MSG_LEN];
-  char gid[33];
-retry:
-  gid_to_wire_gid(&(im.gid), gid);
-  sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
-  ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
-                 << ", " << im.peer_qpn << ", "  << gid  << dendl;
-  r = ::write(sd, msg, sizeof(msg));
-  // Drop incoming qpt
-  if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
-    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
-      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
-      return -EINVAL;
-    }
-  }
-
-  if ((size_t)r != sizeof(msg)) {
-    // FIXME need to handle EAGAIN instead of retry
-    if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
-      retry++;
-      goto retry;
-    }
-    if (r < 0)
-      lderr(cct) << __func__ << " send returned error " << errno << ": "
-                 << cpp_strerror(errno) << dendl;
-    else
-      lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
-    return -errno;
-  }
-  return 0;
-}
-
-void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)
-{
-  char tmp[9];
-  uint32_t v32;
-  int i;
-
-  for (tmp[8] = 0, i = 0; i < 4; ++i) {
-    memcpy(tmp, wgid + i * 8, 8);
-    sscanf(tmp, "%x", &v32);
-    *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);
-  }
-}
-
-void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])
-{
-  for (int i = 0; i < 4; ++i)
-    sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));
-}
-
 Infiniband::QueuePair::~QueuePair()
 {
   if (qp) {
index 682aeffbc6fabf129f3cc5eec4320dd687fc28d9..cd74d9e57fc8e669befb7346d2c948b0886bd259 100644 (file)
@@ -144,9 +144,6 @@ class Infiniband {
   DeviceList *device_list;
   RDMADispatcher *dispatcher = nullptr;
 
-  void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
-  void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
-
  public:
   explicit Infiniband(CephContext *c);
   ~Infiniband();
@@ -268,8 +265,6 @@ class Infiniband {
   };
 
  public:
-  int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
-  int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
   static const char* wc_status_to_string(int status);
   static const char* qp_state_string(int status);
 
@@ -285,4 +280,9 @@ class Infiniband {
   RDMADispatcher *get_dispatcher() { return dispatcher; }
 };
 
+inline ostream& operator<<(ostream& out, const Infiniband::QueuePair &qp)
+{
+    return out << qp.get_local_qp_number();
+}
+
 #endif
diff --git a/src/msg/async/rdma/RDMAConnTCP.cc b/src/msg/async/rdma/RDMAConnTCP.cc
new file mode 100644 (file)
index 0000000..f2c3c9a
--- /dev/null
@@ -0,0 +1,117 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "RDMAStack.h"
+#include "Device.h"
+#include "RDMAConnTCP.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << " RDMAConnTCP "
+
+static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
+
+// 1 means no valid buffer read, 0 means got enough buffer
+// else return < 0 means error
+int RDMAConnTCP::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
+{
+  char msg[TCP_MSG_LEN];
+  char gid[33];
+  ssize_t r = ::read(sd, &msg, sizeof(msg));
+  // Drop incoming qpt
+  if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
+    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+      return -EINVAL;
+    }
+  }
+  if (r < 0) {
+    r = -errno;
+    lderr(cct) << __func__ << " got error " << r << ": "
+               << cpp_strerror(r) << dendl;
+  } else if (r == 0) { // valid disconnect message of length 0
+    ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
+  } else if ((size_t)r != sizeof(msg)) { // invalid message
+    ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
+    r = -EINVAL;
+  } else { // valid message
+    sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
+    wire_gid_to_gid(gid, &(im.gid));
+    ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid  << dendl;
+  }
+  return r;
+}
+
+int RDMAConnTCP::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
+{
+  int retry = 0;
+  ssize_t r;
+
+  char msg[TCP_MSG_LEN];
+  char gid[33];
+retry:
+  gid_to_wire_gid(&(im.gid), gid);
+  sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
+  ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
+                 << ", " << im.peer_qpn << ", "  << gid  << dendl;
+  r = ::write(sd, msg, sizeof(msg));
+  // Drop incoming qpt
+  if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
+    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+      return -EINVAL;
+    }
+  }
+
+  if ((size_t)r != sizeof(msg)) {
+    // FIXME need to handle EAGAIN instead of retry
+    if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
+      retry++;
+      goto retry;
+    }
+    if (r < 0)
+      lderr(cct) << __func__ << " send returned error " << errno << ": "
+                 << cpp_strerror(errno) << dendl;
+    else
+      lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
+    return -errno;
+  }
+  return 0;
+}
+
+void RDMAConnTCP::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)
+{
+  char tmp[9];
+  uint32_t v32;
+  int i;
+
+  for (tmp[8] = 0, i = 0; i < 4; ++i) {
+    memcpy(tmp, wgid + i * 8, 8);
+    sscanf(tmp, "%x", &v32);
+    *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);
+  }
+}
+
+void RDMAConnTCP::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])
+{
+  for (int i = 0; i < 4; ++i)
+    sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));
+}
+
+ostream &RDMAConnTCP::print(ostream &out) const
+{
+  return out << "TCP {tcp_fd: " << tcp_fd << "}";
+}
diff --git a/src/msg/async/rdma/RDMAConnTCP.h b/src/msg/async/rdma/RDMAConnTCP.h
new file mode 100644 (file)
index 0000000..39d96d5
--- /dev/null
@@ -0,0 +1,84 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_RDMA_CONNECTED_SOCKET_TCP_H
+#define CEPH_MSG_RDMA_CONNECTED_SOCKET_TCP_H
+
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "msg/async/Stack.h"
+#include "Infiniband.h"
+#include "RDMAConnectedSocketImpl.h"
+
+class RDMAWorker;
+class RDMADispatcher;
+
+class RDMAConnTCP : public RDMAConnMgr {
+  class C_handle_connection : public EventCallback {
+    RDMAConnTCP *cst;
+    bool active;
+   public:
+    C_handle_connection(RDMAConnTCP *w): cst(w), active(true) {};
+    void do_request(int fd) {
+      if (active)
+        cst->handle_connection();
+    };
+    void close() {
+      active = false;
+    };
+  };
+
+  IBSYNMsg peer_msg;
+  IBSYNMsg my_msg;
+  EventCallbackRef con_handler;
+  int tcp_fd = -1;
+
+ private:
+  void handle_connection();
+  int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
+  int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
+  int activate();
+  void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
+  void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
+
+ public:
+  RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock,
+             Infiniband* ib, RDMADispatcher* s, RDMAWorker *w);
+  virtual ~RDMAConnTCP();
+
+  virtual ostream &print(ostream &out) const override;
+
+  void set_accept_fd(int sd);
+
+  virtual void cleanup() override;
+  virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
+};
+
+class RDMAServerConnTCP : public RDMAServerSocketImpl {
+  NetHandler net;
+  int server_setup_socket;
+
+ public:
+  RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
+
+  int listen(entity_addr_t &sa, const SocketOptions &opt);
+  virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
+  virtual void abort_accept() override;
+  virtual int fd() const override { return server_setup_socket; }
+};
+
+#endif
index 718b7a25c125b77876437547e6e2fb9a0a589328..80f3c014adb0db9f2f4d8f5afbe411faad461d2f 100644 (file)
 
 #include "RDMAStack.h"
 #include "Device.h"
+#include "RDMAConnectedSocketImpl.h"
+#include "RDMAConnTCP.h"
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
 #define dout_prefix *_dout << " RDMAConnectedSocketImpl "
 
+RDMAConnMgr::RDMAConnMgr(CephContext *cct, RDMAConnectedSocketImpl *sock,
+                        Infiniband* ib, RDMADispatcher* s, RDMAWorker *w)
+  : cct(cct), socket(sock), infiniband(ib), dispatcher(s), worker(w),
+    is_server(false), active(false),
+    connected(0)
+{
+}
+
 RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
                                                 RDMAWorker *w)
-  : cct(cct), connected(0), error(0), infiniband(ib),
-    dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
-    is_server(false), con_handler(new C_handle_connection(this)),
-    active(false)
+  : cct(cct), infiniband(ib), dispatcher(s), worker(w),
+    error(0), lock("RDMAConnectedSocketImpl::lock")
+{
+  cmgr = new RDMAConnTCP(cct, this, ib, s, w);
+}
+
+QueuePair *RDMAConnectedSocketImpl::create_queue_pair(Device *d, int p)
 {
-  ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
-  ibport = cct->_conf->ms_async_rdma_port_num;
+  ibdev = d;
+  ibport = p;
+
+  qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
+
+  local_qpn = qp->get_local_qp_number();
+
+  return qp;
+}
+
+RDMAConnTCP::RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock,
+                        Infiniband* ib, RDMADispatcher* s, RDMAWorker *w)
+  : RDMAConnMgr(cct, sock, ib, s, w), con_handler(new C_handle_connection(this))
+{
+  Device *ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
+  int ibport = cct->_conf->ms_async_rdma_port_num;
 
   assert(ibdev);
   assert(ibport > 0);
 
   ibdev->init(ibport);
 
-  qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
-  my_msg.qpn = qp->get_local_qp_number();
+  QueuePair *qp = socket->create_queue_pair(ibdev, ibport);
+
+  my_msg.qpn = socket->local_qpn;
   my_msg.psn = qp->get_initial_psn();
   my_msg.lid = ibdev->get_lid();
   my_msg.peer_qpn = 0;
   my_msg.gid = ibdev->get_gid();
+  socket->register_qp(qp);
+}
+
+void RDMAConnectedSocketImpl::register_qp(QueuePair *qp)
+{
   notify_fd = dispatcher->register_qp(qp, this);
   dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
   dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
 }
 
+RDMAConnTCP::~RDMAConnTCP()
+{
+  cleanup();
+  if (tcp_fd >= 0)
+    ::close(tcp_fd);
+}
+
 RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
 {
   ldout(cct, 20) << __func__ << " destruct." << dendl;
-  cleanup();
   worker->remove_pending_conn(this);
-  dispatcher->erase_qpn(my_msg.qpn);
+  dispatcher->erase_qpn(local_qpn);
   Mutex::Locker l(lock);
   if (notify_fd >= 0)
     ::close(notify_fd);
-  if (tcp_fd >= 0)
-    ::close(tcp_fd);
   error = ECONNRESET;
   int ret = 0;
   for (unsigned i=0; i < wc.size(); ++i) {
@@ -68,6 +105,8 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
     ret = ibdev->post_chunk(buffers[i]);
     assert(ret == 0);
   }
+
+  delete cmgr;
 }
 
 void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
@@ -88,11 +127,13 @@ void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
   w.swap(wc);
 }
 
-int RDMAConnectedSocketImpl::activate()
+int RDMAConnTCP::activate()
 {
   ibv_qp_attr qpa;
   int r;
 
+  socket->remote_qpn = peer_msg.qpn;
+
   // now connect up the qps and switch to RTR
   memset(&qpa, 0, sizeof(qpa));
   qpa.qp_state = IBV_QPS_RTR;
@@ -106,15 +147,17 @@ int RDMAConnectedSocketImpl::activate()
   qpa.ah_attr.grh.hop_limit = 6;
   qpa.ah_attr.grh.dgid = peer_msg.gid;
 
-  qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx();
+  qpa.ah_attr.grh.sgid_index = socket->get_device()->get_gid_idx();
 
   qpa.ah_attr.dlid = peer_msg.lid;
   qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
   qpa.ah_attr.src_path_bits = 0;
-  qpa.ah_attr.port_num = (uint8_t)ibport;
+  qpa.ah_attr.port_num = (uint8_t)socket->get_ibport();
 
   ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
 
+  QueuePair *qp = socket->get_qp();
+
   r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
       IBV_QP_AV |
       IBV_QP_PATH_MTU |
@@ -168,15 +211,15 @@ int RDMAConnectedSocketImpl::activate()
 
   if (!is_server) {
     connected = 1; //indicate successfully
-    ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;
-    submit(false);
+    ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << *qp << dendl;
+    socket->submit(false);
   }
   active = true;
 
   return 0;
 }
 
-int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
+int RDMAConnTCP::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
   ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
                  << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
   NetHandler net(cct);
@@ -197,7 +240,7 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
   ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
   net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
   my_msg.peer_qpn = 0;
-  r = infiniband->send_msg(cct, tcp_fd, my_msg);
+  r = send_msg(cct, tcp_fd, my_msg);
   if (r < 0)
     return r;
 
@@ -205,14 +248,14 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
   return 0;
 }
 
-void RDMAConnectedSocketImpl::handle_connection() {
-  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl;
-  int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
+void RDMAConnTCP::handle_connection() {
+  ldout(cct, 20) << __func__ << " " << *socket << dendl;
+  int r = recv_msg(cct, tcp_fd, peer_msg);
   if (r < 0) {
     if (r != -EAGAIN) {
       dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
       ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
-      fault();
+      socket->fault();
     }
     return;
   }
@@ -225,12 +268,12 @@ void RDMAConnectedSocketImpl::handle_connection() {
       r = activate();
       assert(!r);
     }
-    notify();
-    r = infiniband->send_msg(cct, tcp_fd, my_msg);
+    socket->notify();
+    r = send_msg(cct, tcp_fd, my_msg);
     if (r < 0) {
       ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
       dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
-      fault();
+      socket->fault();
     }
   } else {
     if (peer_msg.peer_qpn == 0) {// syn from client
@@ -238,11 +281,11 @@ void RDMAConnectedSocketImpl::handle_connection() {
         ldout(cct, 10) << __func__ << " server is already active." << dendl;
         return ;
       }
-      r = infiniband->send_msg(cct, tcp_fd, my_msg);
+      r = send_msg(cct, tcp_fd, my_msg);
       if (r < 0) {
         ldout(cct, 1) << __func__ << " server ack failed." << dendl;
         dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
-        fault();
+        socket->fault();
         return ;
       }
       r = activate();
@@ -250,17 +293,28 @@ void RDMAConnectedSocketImpl::handle_connection() {
     } else { // ack from client
       connected = 1;
       cleanup();
-      submit(false);
-      notify();
+      socket->submit(false);
+      socket->notify();
     }
   }
 }
 
+void RDMAConnMgr::post_read()
+{
+  if (!is_server || connected)
+    return;
+
+  ldout(cct, 20) << __func__ << " we do not need last handshake, " << *socket << dendl;
+  connected = 1; //if so, we don't need the last handshake
+  cleanup();
+  socket->submit(false);
+}
+
 ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
 {
   uint64_t i = 0;
   int r = ::read(notify_fd, &i, sizeof(i));
-  ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl;
+  ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << *qp << " r = " << r << dendl;
   if (error)
     return -error;
   ssize_t read = 0;
@@ -272,7 +326,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
   if (cqe.empty())
     return read == 0 ? -EAGAIN : read;
 
-  ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
+  ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << *qp << dendl;
   for (size_t i = 0; i < cqe.size(); ++i) {
     ibv_wc* response = &cqe[i];
     assert(response->status == IBV_WC_SUCCESS);
@@ -282,7 +336,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
     worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
     if (response->byte_len == 0) {
       dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
-      if (connected) {
+      if (cmgr->connected) {
         error = ECONNRESET;
         ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
       }
@@ -303,12 +357,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
   }
 
   worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
-  if (is_server && connected == 0) {
-    ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
-    connected = 1; //if so, we don't need the last handshake
-    cleanup();
-    submit(false);
-  }
+  cmgr->post_read();
 
   if (read == 0 && error)
     return -error;
@@ -389,7 +438,7 @@ ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
 ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
 {
   if (error) {
-    if (!active)
+    if (!cmgr->active)
       return -EPIPE;
     return -error;
   }
@@ -399,12 +448,12 @@ ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
   {
     Mutex::Locker l(lock);
     pending_bl.claim_append(bl);
-    if (!connected) {
-      ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
+    if (!cmgr->connected) {
+      ldout(cct, 20) << __func__ << " fake send to upper, QP: " << *qp << dendl;
       return bytes;
     }
   }
-  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;
+  ldout(cct, 20) << __func__ << " QP: " << *qp << dendl;
   ssize_t r = submit(more);
   if (r < 0 && r != -EAGAIN)
     return r;
@@ -508,7 +557,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
 
 int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
 {
-  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;
+  ldout(cct, 20) << __func__ << " QP: " << *qp << " " << tx_buffers[0] << dendl;
   vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
   ibv_sge isge[tx_buffers.size()];
   uint32_t current_sge = 0;
@@ -575,7 +624,7 @@ void RDMAConnectedSocketImpl::fin() {
   }
 }
 
-void RDMAConnectedSocketImpl::cleanup() {
+void RDMAConnTCP::cleanup() {
   if (con_handler && tcp_fd >= 0) {
     (static_cast<C_handle_connection*>(con_handler))->close();
     worker->center.submit_to(worker->center.get_id(), [this]() {
@@ -595,35 +644,32 @@ void RDMAConnectedSocketImpl::notify()
   assert(ret = sizeof(i));
 }
 
-void RDMAConnectedSocketImpl::shutdown()
+void RDMAConnMgr::shutdown()
 {
-  if (!error)
-    fin();
-  error = ECONNRESET;
+  if (!socket->error)
+    socket->fin();
+  socket->error = ECONNRESET;
   active = false;
 }
 
-void RDMAConnectedSocketImpl::close()
+void RDMAConnMgr::close()
 {
-  if (!error)
-    fin();
-  error = ECONNRESET;
-  active = false;
+  shutdown();
 }
 
 void RDMAConnectedSocketImpl::fault()
 {
-  ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
+  ldout(cct, 1) << __func__ << dendl;
   /*if (qp) {
     qp->to_dead();
     qp = NULL;
     }*/
   error = ECONNRESET;
-  connected = 1;
+  cmgr->connected = 1;
   notify();
 }
 
-void RDMAConnectedSocketImpl::set_accept_fd(int sd)
+void RDMAConnTCP::set_accept_fd(int sd)
 {
   tcp_fd = sd;
   is_server = true;
index e9b1ef96fb4d322ee03b8a12896aad82018fa019..d48334b2eeb9d5ebccf9a50d924dc18f04f022f2 100644 (file)
 
 class RDMAWorker;
 class RDMADispatcher;
+class RDMAConnectedSocketImpl;
+
+typedef Infiniband::QueuePair QueuePair;
+
+class RDMAConnMgr {
+  friend class RDMAConnectedSocketImpl;
+
+ protected:
+  CephContext *cct;
+  RDMAConnectedSocketImpl *socket;
+  Infiniband* infiniband;
+  RDMADispatcher* dispatcher;
+  RDMAWorker* worker;
+
+  bool is_server;
+  bool active;// qp is active ?
+  int connected;
+
+ public:
+  RDMAConnMgr(CephContext *cct, RDMAConnectedSocketImpl *sock,
+             Infiniband* ib, RDMADispatcher* s, RDMAWorker *w);
+  virtual ~RDMAConnMgr() { };
+
+  virtual ostream &print(ostream &out) const = 0;
+
+  virtual void cleanup() = 0;
+  virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) = 0;
+  virtual void set_accept_fd(int sd) = 0;
+
+  void post_read();
+
+  void shutdown();
+  void close();
+};
+inline ostream& operator<<(ostream& out, const RDMAConnMgr &m)
+{
+    return m.print(out);
+}
 
 class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
+  friend class RDMAConnMgr;
+
+ protected:
+  CephContext *cct;
+  Infiniband* infiniband;
+  RDMADispatcher* dispatcher;
+  RDMAWorker* worker;
+  Device *ibdev = nullptr;
+  int ibport = -1;
+  QueuePair *qp = nullptr;
+
  public:
   typedef Infiniband::MemoryManager::Chunk Chunk;
   typedef Infiniband::CompletionChannel CompletionChannel;
   typedef Infiniband::CompletionQueue CompletionQueue;
 
  private:
-  CephContext *cct;
-  Infiniband::QueuePair *qp;
-  Device *ibdev;
-  int ibport;
-  IBSYNMsg peer_msg;
-  IBSYNMsg my_msg;
-  int connected;
+  RDMAConnMgr *cmgr;
   int error;
-  Infiniband* infiniband;
-  RDMADispatcher* dispatcher;
-  RDMAWorker* worker;
   std::vector<Chunk*> buffers;
   int notify_fd = -1;
   bufferlist pending_bl;
 
   Mutex lock;
   std::vector<ibv_wc> wc;
-  bool is_server;
-  EventCallbackRef con_handler;
-  int tcp_fd = -1;
-  bool active;// qp is active ?
 
-  void notify();
   ssize_t read_buffers(char* buf, size_t len);
   int post_work_request(std::vector<Chunk*>&);
 
  public:
+  uint32_t local_qpn = 0;
+  uint32_t remote_qpn = 0;
+
   RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
                           RDMAWorker *w);
   virtual ~RDMAConnectedSocketImpl();
 
-  Device *get_device() { return ibdev; }
+  ostream &print(ostream &out) const {
+    return out << "socket {lqpn: " << local_qpn << " rqpn: " << remote_qpn << " " << *cmgr << "}";
+  };
+
+  Device *get_device() { return ibdev; };
+  int get_ibport() { return ibport; };
 
   void pass_wc(std::vector<ibv_wc> &&v);
   void get_wc(std::vector<ibv_wc> &w);
-  virtual int is_connected() override { return connected; }
+  virtual int is_connected() override { return cmgr->connected; }
 
   virtual ssize_t read(char* buf, size_t len) override;
   virtual ssize_t zero_copy_read(bufferptr &data) override;
   virtual ssize_t send(bufferlist &bl, bool more) override;
-  virtual void shutdown() override;
-  virtual void close() override;
+  virtual void shutdown() override { cmgr->shutdown(); };
+  virtual void close() override { cmgr->close(); };
   virtual int fd() const override { return notify_fd; }
   void fault();
   const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
+  QueuePair *get_qp() { return qp; };
   ssize_t submit(bool more);
-  int activate();
   void fin();
-  void handle_connection();
-  void cleanup();
-  void set_accept_fd(int sd);
-  int try_connect(const entity_addr_t&, const SocketOptions &opt);
-
-  class C_handle_connection : public EventCallback {
-    RDMAConnectedSocketImpl *csi;
-    bool active;
-   public:
-    C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
-    void do_request(int fd) {
-      if (active)
-        csi->handle_connection();
-    }
-    void close() {
-      active = false;
-    }
-  };
+  void register_qp(QueuePair *qp);
+  void notify();
+
+  QueuePair *create_queue_pair(Device *d, int p);
+  void set_accept_fd(int sd) {cmgr->set_accept_fd(sd); };
+  int try_connect(const entity_addr_t &sa, const SocketOptions &opt) { return cmgr->try_connect(sa, opt); };
 };
+inline ostream& operator<<(ostream& out, const RDMAConnectedSocketImpl &s)
+{
+  return s.print(out);
+}
+
 
 class RDMAServerSocketImpl : public ServerSocketImpl {
+ protected:
   CephContext *cct;
   Device *ibdev;
   int ibport;
-  NetHandler net;
-  int server_setup_socket;
   Infiniband* infiniband;
   RDMADispatcher *dispatcher;
   RDMAWorker *worker;
@@ -115,11 +150,10 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
  public:
   RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
 
-  int listen(entity_addr_t &sa, const SocketOptions &opt);
-  virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
-  virtual void abort_accept() override;
-  virtual int fd() const override { return server_setup_socket; }
-  int get_fd() { return server_setup_socket; }
+  virtual int listen(entity_addr_t &sa, const SocketOptions &opt) = 0;
+  virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) = 0;
+  virtual void abort_accept() = 0;
+  virtual int fd() const = 0;
 };
 
 #endif
index 93a22532796d132c3ff7517b2e7d616a2d67d56d..f2b6b79b4233ef18fa6f84a0f8245b527d5502bc 100644 (file)
 #include "msg/async/net_handler.h"
 #include "RDMAStack.h"
 #include "Device.h"
+#include "RDMAConnTCP.h"
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
 #define dout_prefix *_dout << " RDMAServerSocketImpl "
 
 RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
-  : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a)
+  : cct(cct), infiniband(i), dispatcher(s), worker(w), sa(a)
+{
+}
+
+RDMAServerConnTCP::RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
+  : RDMAServerSocketImpl(cct, i, s, w, a), net(cct), server_setup_socket(-1)
 {
   ibdev = infiniband->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
   ibport = cct->_conf->ms_async_rdma_port_num;
@@ -34,7 +40,7 @@ RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMA
   ibdev->init(ibport);
 }
 
-int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
+int RDMAServerConnTCP::listen(entity_addr_t &sa, const SocketOptions &opt)
 {
   int rc = 0;
   server_setup_socket = net.create_socket(sa.get_family(), true);
@@ -80,7 +86,7 @@ err:
   return -errno;
 }
 
-int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)
+int RDMAServerConnTCP::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)
 {
   ldout(cct, 15) << __func__ << dendl;
 
@@ -106,7 +112,7 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt
   }
   net.set_priority(sd, opt.priority, out->get_family());
 
-  RDMAConnectedSocketImplserver;
+  RDMAConnectedSocketImpl *server;
   //Worker* w = dispatcher->get_stack()->get_worker();
   server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w));
   server->set_accept_fd(sd);
@@ -119,7 +125,7 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt
   return 0;
 }
 
-void RDMAServerSocketImpl::abort_accept()
+void RDMAServerConnTCP::abort_accept()
 {
   if (server_setup_socket >= 0)
     ::close(server_setup_socket);
index dc856b965e8e3c18241cd75dfe0a0f5428a164bb..b4e7fab3483abdee7130cf9945a39cb88e25bae0 100644 (file)
@@ -21,6 +21,7 @@
 #include "common/deleter.h"
 #include "common/Tub.h"
 #include "RDMAStack.h"
+#include "RDMAConnTCP.h"
 #include "Device.h"
 
 #define dout_subsys ceph_subsys_ms
@@ -410,7 +411,7 @@ void RDMAWorker::initialize()
 
 int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
 {
-  auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
+  auto p = new RDMAServerConnTCP(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
   int r = p->listen(sa, opt);
   if (r < 0) {
     delete p;