]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Revert "msg/async/rdma: Introduce RDMAConnMgr"
authorAmir Vadai <amir@vadai.me>
Tue, 23 May 2017 07:33:45 +0000 (10:33 +0300)
committerAmir Vadai <amir@vadai.me>
Tue, 23 May 2017 14:04:20 +0000 (17:04 +0300)
This reverts commit 91bb13dceddb433966e6f775716e53ec5f523f5e.

Change-Id: I91dbe3c02651402f0e97e89151ec6a5154afdca3
Issue: 995322
Signed-off-by: Amir Vadai <amir@vadai.me>
src/CMakeLists.txt
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnTCP.cc [deleted file]
src/msg/async/rdma/RDMAConnTCP.h [deleted file]
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAConnectedSocketImpl.h
src/msg/async/rdma/RDMAServerSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc

index ad658e722b5e0c7e61eeddc9f95ce1ef85f8b534..e9f86e31cb57d363a476cb119b59605abe119189 100644 (file)
@@ -364,7 +364,6 @@ if(HAVE_RDMA)
     msg/async/rdma/Infiniband.cc
     msg/async/rdma/Device.cc
     msg/async/rdma/RDMAConnectedSocketImpl.cc
-    msg/async/rdma/RDMAConnTCP.cc
     msg/async/rdma/RDMAServerSocketImpl.cc
     msg/async/rdma/RDMAStack.cc)
 endif(HAVE_RDMA)
index de9d6e21b9a0421e9a221e2a9d71e3925061c275..020d1be47b6e93a84adc5a3aecf6d5581a1fb9ce 100644 (file)
@@ -26,6 +26,7 @@
 #define dout_prefix *_dout << "Infiniband "
 
 static const uint32_t MAX_INLINE_DATA = 0;
+static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
 
 Infiniband::QueuePair::QueuePair(
     CephContext *c, Device &device, ibv_qp_type type,
@@ -609,6 +610,93 @@ Device* Infiniband::get_device(const char* device_name)
   return device_list->get_device(device_name);
 }
 
+// 1 means no valid buffer read, 0 means got enough buffer
+// else return < 0 means error
+int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
+{
+  char msg[TCP_MSG_LEN];
+  char gid[33];
+  ssize_t r = ::read(sd, &msg, sizeof(msg));
+  // Drop incoming qpt
+  if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
+    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+      return -EINVAL;
+    }
+  }
+  if (r < 0) {
+    r = -errno;
+    lderr(cct) << __func__ << " got error " << r << ": "
+               << cpp_strerror(r) << dendl;
+  } else if (r == 0) { // valid disconnect message of length 0
+    ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
+  } else if ((size_t)r != sizeof(msg)) { // invalid message
+    ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
+    r = -EINVAL;
+  } else { // valid message
+    sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
+    wire_gid_to_gid(gid, &(im.gid));
+    ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid  << dendl;
+  }
+  return r;
+}
+
+int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
+{
+  int retry = 0;
+  ssize_t r;
+
+  char msg[TCP_MSG_LEN];
+  char gid[33];
+retry:
+  gid_to_wire_gid(&(im.gid), gid);
+  sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
+  ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
+                 << ", " << im.peer_qpn << ", "  << gid  << dendl;
+  r = ::write(sd, msg, sizeof(msg));
+  // Drop incoming qpt
+  if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
+    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+      return -EINVAL;
+    }
+  }
+
+  if ((size_t)r != sizeof(msg)) {
+    // FIXME need to handle EAGAIN instead of retry
+    if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
+      retry++;
+      goto retry;
+    }
+    if (r < 0)
+      lderr(cct) << __func__ << " send returned error " << errno << ": "
+                 << cpp_strerror(errno) << dendl;
+    else
+      lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
+    return -errno;
+  }
+  return 0;
+}
+
+void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)
+{
+  char tmp[9];
+  uint32_t v32;
+  int i;
+
+  for (tmp[8] = 0, i = 0; i < 4; ++i) {
+    memcpy(tmp, wgid + i * 8, 8);
+    sscanf(tmp, "%x", &v32);
+    *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);
+  }
+}
+
+void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])
+{
+  for (int i = 0; i < 4; ++i)
+    sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));
+}
+
 Infiniband::QueuePair::~QueuePair()
 {
   if (qp) {
index 98b710a0385d2ede7704128e4d9ab3f9de0bf1b6..2ac392738e4e81b9bf6c978967c8195385cf36b4 100644 (file)
@@ -144,6 +144,9 @@ class Infiniband {
   DeviceList *device_list;
   RDMADispatcher *dispatcher = nullptr;
 
+  void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
+  void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
+
  public:
   explicit Infiniband(CephContext *c);
   ~Infiniband();
@@ -265,6 +268,8 @@ class Infiniband {
   };
 
  public:
+  int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
+  int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
   static const char* wc_status_to_string(int status);
   static const char* qp_state_string(int status);
 
@@ -280,9 +285,4 @@ class Infiniband {
   RDMADispatcher *get_dispatcher() { return dispatcher; }
 };
 
-inline ostream& operator<<(ostream& out, const Infiniband::QueuePair &qp)
-{
-    return out << qp.get_local_qp_number();
-}
-
 #endif
diff --git a/src/msg/async/rdma/RDMAConnTCP.cc b/src/msg/async/rdma/RDMAConnTCP.cc
deleted file mode 100644 (file)
index f2c3c9a..0000000
+++ /dev/null
@@ -1,117 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 XSKY <haomai@xsky.com>
- *
- * Author: Haomai Wang <haomaiwang@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#include "RDMAStack.h"
-#include "Device.h"
-#include "RDMAConnTCP.h"
-
-#define dout_subsys ceph_subsys_ms
-#undef dout_prefix
-#define dout_prefix *_dout << " RDMAConnTCP "
-
-static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
-
-// 1 means no valid buffer read, 0 means got enough buffer
-// else return < 0 means error
-int RDMAConnTCP::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
-{
-  char msg[TCP_MSG_LEN];
-  char gid[33];
-  ssize_t r = ::read(sd, &msg, sizeof(msg));
-  // Drop incoming qpt
-  if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
-    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
-      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
-      return -EINVAL;
-    }
-  }
-  if (r < 0) {
-    r = -errno;
-    lderr(cct) << __func__ << " got error " << r << ": "
-               << cpp_strerror(r) << dendl;
-  } else if (r == 0) { // valid disconnect message of length 0
-    ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
-  } else if ((size_t)r != sizeof(msg)) { // invalid message
-    ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
-    r = -EINVAL;
-  } else { // valid message
-    sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
-    wire_gid_to_gid(gid, &(im.gid));
-    ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid  << dendl;
-  }
-  return r;
-}
-
-int RDMAConnTCP::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
-{
-  int retry = 0;
-  ssize_t r;
-
-  char msg[TCP_MSG_LEN];
-  char gid[33];
-retry:
-  gid_to_wire_gid(&(im.gid), gid);
-  sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
-  ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
-                 << ", " << im.peer_qpn << ", "  << gid  << dendl;
-  r = ::write(sd, msg, sizeof(msg));
-  // Drop incoming qpt
-  if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
-    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
-      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
-      return -EINVAL;
-    }
-  }
-
-  if ((size_t)r != sizeof(msg)) {
-    // FIXME need to handle EAGAIN instead of retry
-    if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
-      retry++;
-      goto retry;
-    }
-    if (r < 0)
-      lderr(cct) << __func__ << " send returned error " << errno << ": "
-                 << cpp_strerror(errno) << dendl;
-    else
-      lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
-    return -errno;
-  }
-  return 0;
-}
-
-void RDMAConnTCP::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)
-{
-  char tmp[9];
-  uint32_t v32;
-  int i;
-
-  for (tmp[8] = 0, i = 0; i < 4; ++i) {
-    memcpy(tmp, wgid + i * 8, 8);
-    sscanf(tmp, "%x", &v32);
-    *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);
-  }
-}
-
-void RDMAConnTCP::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])
-{
-  for (int i = 0; i < 4; ++i)
-    sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));
-}
-
-ostream &RDMAConnTCP::print(ostream &out) const
-{
-  return out << "TCP {tcp_fd: " << tcp_fd << "}";
-}
diff --git a/src/msg/async/rdma/RDMAConnTCP.h b/src/msg/async/rdma/RDMAConnTCP.h
deleted file mode 100644 (file)
index 39d96d5..0000000
+++ /dev/null
@@ -1,84 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 XSKY <haomai@xsky.com>
- *
- * Author: Haomai Wang <haomaiwang@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#ifndef CEPH_MSG_RDMA_CONNECTED_SOCKET_TCP_H
-#define CEPH_MSG_RDMA_CONNECTED_SOCKET_TCP_H
-
-#include "common/ceph_context.h"
-#include "common/debug.h"
-#include "common/errno.h"
-#include "msg/async/Stack.h"
-#include "Infiniband.h"
-#include "RDMAConnectedSocketImpl.h"
-
-class RDMAWorker;
-class RDMADispatcher;
-
-class RDMAConnTCP : public RDMAConnMgr {
-  class C_handle_connection : public EventCallback {
-    RDMAConnTCP *cst;
-    bool active;
-   public:
-    C_handle_connection(RDMAConnTCP *w): cst(w), active(true) {};
-    void do_request(int fd) {
-      if (active)
-        cst->handle_connection();
-    };
-    void close() {
-      active = false;
-    };
-  };
-
-  IBSYNMsg peer_msg;
-  IBSYNMsg my_msg;
-  EventCallbackRef con_handler;
-  int tcp_fd = -1;
-
- private:
-  void handle_connection();
-  int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
-  int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
-  int activate();
-  void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
-  void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
-
- public:
-  RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock,
-             Infiniband* ib, RDMADispatcher* s, RDMAWorker *w);
-  virtual ~RDMAConnTCP();
-
-  virtual ostream &print(ostream &out) const override;
-
-  void set_accept_fd(int sd);
-
-  virtual void cleanup() override;
-  virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
-};
-
-class RDMAServerConnTCP : public RDMAServerSocketImpl {
-  NetHandler net;
-  int server_setup_socket;
-
- public:
-  RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
-
-  int listen(entity_addr_t &sa, const SocketOptions &opt);
-  virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
-  virtual void abort_accept() override;
-  virtual int fd() const override { return server_setup_socket; }
-};
-
-#endif
index ca7ac180d53a1cc01e32c4c25917c1fb0c334f89..e7066963de9aaf65a50bd31e2ee85f57614d04f4 100644 (file)
 
 #include "RDMAStack.h"
 #include "Device.h"
-#include "RDMAConnectedSocketImpl.h"
-#include "RDMAConnTCP.h"
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
 #define dout_prefix *_dout << " RDMAConnectedSocketImpl "
 
-RDMAConnMgr::RDMAConnMgr(CephContext *cct, RDMAConnectedSocketImpl *sock,
-                        Infiniband* ib, RDMADispatcher* s, RDMAWorker *w)
-  : cct(cct), socket(sock), infiniband(ib), dispatcher(s), worker(w),
-    is_server(false), active(false),
-    connected(0)
-{
-}
-
 RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
                                                 RDMAWorker *w)
-  : cct(cct), infiniband(ib), dispatcher(s), worker(w),
-    error(0), lock("RDMAConnectedSocketImpl::lock")
-{
-  cmgr = new RDMAConnTCP(cct, this, ib, s, w);
-}
-
-QueuePair *RDMAConnectedSocketImpl::create_queue_pair(Device *d, int p)
+  : cct(cct), connected(0), error(0), infiniband(ib),
+    dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
+    is_server(false), con_handler(new C_handle_connection(this)),
+    active(false)
 {
-  ibdev = d;
-  ibport = p;
-
-  qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
-
-  local_qpn = qp->get_local_qp_number();
-
-  return qp;
-}
-
-RDMAConnTCP::RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock,
-                        Infiniband* ib, RDMADispatcher* s, RDMAWorker *w)
-  : RDMAConnMgr(cct, sock, ib, s, w), con_handler(new C_handle_connection(this))
-{
-  Device *ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
-  int ibport = cct->_conf->ms_async_rdma_port_num;
+  ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
+  ibport = cct->_conf->ms_async_rdma_port_num;
 
   assert(ibdev);
   assert(ibport > 0);
 
   ibdev->init(ibport);
 
-  QueuePair *qp = socket->create_queue_pair(ibdev, ibport);
-
-  my_msg.qpn = socket->local_qpn;
+  qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
+  my_msg.qpn = qp->get_local_qp_number();
   my_msg.psn = qp->get_initial_psn();
   my_msg.lid = ibdev->get_lid();
   my_msg.peer_qpn = 0;
   my_msg.gid = ibdev->get_gid();
-  socket->register_qp(qp);
-}
-
-void RDMAConnectedSocketImpl::register_qp(QueuePair *qp)
-{
   notify_fd = dispatcher->register_qp(qp, this);
   dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
   dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
 }
 
-RDMAConnTCP::~RDMAConnTCP()
-{
-  cleanup();
-  if (tcp_fd >= 0)
-    ::close(tcp_fd);
-}
-
 RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
 {
   ldout(cct, 20) << __func__ << " destruct." << dendl;
+  cleanup();
   worker->remove_pending_conn(this);
-  dispatcher->erase_qpn(local_qpn);
+  dispatcher->erase_qpn(my_msg.qpn);
   Mutex::Locker l(lock);
   if (notify_fd >= 0)
     ::close(notify_fd);
+  if (tcp_fd >= 0)
+    ::close(tcp_fd);
   error = ECONNRESET;
   int ret = 0;
   for (unsigned i=0; i < wc.size(); ++i) {
@@ -107,8 +70,6 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
     assert(ret == 0);
     dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
   }
-
-  delete cmgr;
 }
 
 void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
@@ -129,13 +90,11 @@ void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
   w.swap(wc);
 }
 
-int RDMAConnTCP::activate()
+int RDMAConnectedSocketImpl::activate()
 {
   ibv_qp_attr qpa;
   int r;
 
-  socket->remote_qpn = peer_msg.qpn;
-
   // now connect up the qps and switch to RTR
   memset(&qpa, 0, sizeof(qpa));
   qpa.qp_state = IBV_QPS_RTR;
@@ -149,17 +108,15 @@ int RDMAConnTCP::activate()
   qpa.ah_attr.grh.hop_limit = 6;
   qpa.ah_attr.grh.dgid = peer_msg.gid;
 
-  qpa.ah_attr.grh.sgid_index = socket->get_device()->get_gid_idx();
+  qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx();
 
   qpa.ah_attr.dlid = peer_msg.lid;
   qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
   qpa.ah_attr.src_path_bits = 0;
-  qpa.ah_attr.port_num = (uint8_t)socket->get_ibport();
+  qpa.ah_attr.port_num = (uint8_t)ibport;
 
   ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
 
-  QueuePair *qp = socket->get_qp();
-
   r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
       IBV_QP_AV |
       IBV_QP_PATH_MTU |
@@ -213,15 +170,15 @@ int RDMAConnTCP::activate()
 
   if (!is_server) {
     connected = 1; //indicate successfully
-    ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << *qp << dendl;
-    socket->submit(false);
+    ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;
+    submit(false);
   }
   active = true;
 
   return 0;
 }
 
-int RDMAConnTCP::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
+int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
   ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
                  << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
   NetHandler net(cct);
@@ -242,7 +199,7 @@ int RDMAConnTCP::try_connect(const entity_addr_t& peer_addr, const SocketOptions
   ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
   net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
   my_msg.peer_qpn = 0;
-  r = send_msg(cct, tcp_fd, my_msg);
+  r = infiniband->send_msg(cct, tcp_fd, my_msg);
   if (r < 0)
     return r;
 
@@ -250,14 +207,14 @@ int RDMAConnTCP::try_connect(const entity_addr_t& peer_addr, const SocketOptions
   return 0;
 }
 
-void RDMAConnTCP::handle_connection() {
-  ldout(cct, 20) << __func__ << " " << *socket << dendl;
-  int r = recv_msg(cct, tcp_fd, peer_msg);
+void RDMAConnectedSocketImpl::handle_connection() {
+  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl;
+  int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
   if (r < 0) {
     if (r != -EAGAIN) {
       dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
       ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
-      socket->fault();
+      fault();
     }
     return;
   }
@@ -270,12 +227,12 @@ void RDMAConnTCP::handle_connection() {
       r = activate();
       assert(!r);
     }
-    socket->notify();
-    r = send_msg(cct, tcp_fd, my_msg);
+    notify();
+    r = infiniband->send_msg(cct, tcp_fd, my_msg);
     if (r < 0) {
       ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
       dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
-      socket->fault();
+      fault();
     }
   } else {
     if (peer_msg.peer_qpn == 0) {// syn from client
@@ -283,11 +240,11 @@ void RDMAConnTCP::handle_connection() {
         ldout(cct, 10) << __func__ << " server is already active." << dendl;
         return ;
       }
-      r = send_msg(cct, tcp_fd, my_msg);
+      r = infiniband->send_msg(cct, tcp_fd, my_msg);
       if (r < 0) {
         ldout(cct, 1) << __func__ << " server ack failed." << dendl;
         dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
-        socket->fault();
+        fault();
         return ;
       }
       r = activate();
@@ -295,28 +252,17 @@ void RDMAConnTCP::handle_connection() {
     } else { // ack from client
       connected = 1;
       cleanup();
-      socket->submit(false);
-      socket->notify();
+      submit(false);
+      notify();
     }
   }
 }
 
-void RDMAConnMgr::post_read()
-{
-  if (!is_server || connected)
-    return;
-
-  ldout(cct, 20) << __func__ << " we do not need last handshake, " << *socket << dendl;
-  connected = 1; //if so, we don't need the last handshake
-  cleanup();
-  socket->submit(false);
-}
-
 ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
 {
   uint64_t i = 0;
   int r = ::read(notify_fd, &i, sizeof(i));
-  ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << *qp << " r = " << r << dendl;
+  ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl;
   if (error)
     return -error;
   ssize_t read = 0;
@@ -328,7 +274,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
   if (cqe.empty())
     return read == 0 ? -EAGAIN : read;
 
-  ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << *qp << dendl;
+  ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
   for (size_t i = 0; i < cqe.size(); ++i) {
     ibv_wc* response = &cqe[i];
     assert(response->status == IBV_WC_SUCCESS);
@@ -338,7 +284,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
     worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
     if (response->byte_len == 0) {
       dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
-      if (cmgr->connected) {
+      if (connected) {
         error = ECONNRESET;
         ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
       }
@@ -361,7 +307,12 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
   }
 
   worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
-  cmgr->post_read();
+  if (is_server && connected == 0) {
+    ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
+    connected = 1; //if so, we don't need the last handshake
+    cleanup();
+    submit(false);
+  }
 
   if (read == 0 && error)
     return -error;
@@ -443,7 +394,7 @@ ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
 ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
 {
   if (error) {
-    if (!cmgr->active)
+    if (!active)
       return -EPIPE;
     return -error;
   }
@@ -453,12 +404,12 @@ ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
   {
     Mutex::Locker l(lock);
     pending_bl.claim_append(bl);
-    if (!cmgr->connected) {
-      ldout(cct, 20) << __func__ << " fake send to upper, QP: " << *qp << dendl;
+    if (!connected) {
+      ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
       return bytes;
     }
   }
-  ldout(cct, 20) << __func__ << " QP: " << *qp << dendl;
+  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;
   ssize_t r = submit(more);
   if (r < 0 && r != -EAGAIN)
     return r;
@@ -562,7 +513,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
 
 int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
 {
-  ldout(cct, 20) << __func__ << " QP: " << *qp << " " << tx_buffers[0] << dendl;
+  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;
   vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
   ibv_sge isge[tx_buffers.size()];
   uint32_t current_sge = 0;
@@ -629,7 +580,7 @@ void RDMAConnectedSocketImpl::fin() {
   }
 }
 
-void RDMAConnTCP::cleanup() {
+void RDMAConnectedSocketImpl::cleanup() {
   if (con_handler && tcp_fd >= 0) {
     (static_cast<C_handle_connection*>(con_handler))->close();
     worker->center.submit_to(worker->center.get_id(), [this]() {
@@ -649,32 +600,35 @@ void RDMAConnectedSocketImpl::notify()
   assert(ret = sizeof(i));
 }
 
-void RDMAConnMgr::shutdown()
+void RDMAConnectedSocketImpl::shutdown()
 {
-  if (!socket->error)
-    socket->fin();
-  socket->error = ECONNRESET;
+  if (!error)
+    fin();
+  error = ECONNRESET;
   active = false;
 }
 
-void RDMAConnMgr::close()
+void RDMAConnectedSocketImpl::close()
 {
-  shutdown();
+  if (!error)
+    fin();
+  error = ECONNRESET;
+  active = false;
 }
 
 void RDMAConnectedSocketImpl::fault()
 {
-  ldout(cct, 1) << __func__ << dendl;
+  ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
   /*if (qp) {
     qp->to_dead();
     qp = NULL;
     }*/
   error = ECONNRESET;
-  cmgr->connected = 1;
+  connected = 1;
   notify();
 }
 
-void RDMAConnTCP::set_accept_fd(int sd)
+void RDMAConnectedSocketImpl::set_accept_fd(int sd)
 {
   tcp_fd = sd;
   is_server = true;
index d48334b2eeb9d5ebccf9a50d924dc18f04f022f2..e9b1ef96fb4d322ee03b8a12896aad82018fa019 100644 (file)
 
 class RDMAWorker;
 class RDMADispatcher;
-class RDMAConnectedSocketImpl;
-
-typedef Infiniband::QueuePair QueuePair;
-
-class RDMAConnMgr {
-  friend class RDMAConnectedSocketImpl;
-
- protected:
-  CephContext *cct;
-  RDMAConnectedSocketImpl *socket;
-  Infiniband* infiniband;
-  RDMADispatcher* dispatcher;
-  RDMAWorker* worker;
-
-  bool is_server;
-  bool active;// qp is active ?
-  int connected;
-
- public:
-  RDMAConnMgr(CephContext *cct, RDMAConnectedSocketImpl *sock,
-             Infiniband* ib, RDMADispatcher* s, RDMAWorker *w);
-  virtual ~RDMAConnMgr() { };
-
-  virtual ostream &print(ostream &out) const = 0;
-
-  virtual void cleanup() = 0;
-  virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) = 0;
-  virtual void set_accept_fd(int sd) = 0;
-
-  void post_read();
-
-  void shutdown();
-  void close();
-};
-inline ostream& operator<<(ostream& out, const RDMAConnMgr &m)
-{
-    return m.print(out);
-}
 
 class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
-  friend class RDMAConnMgr;
-
- protected:
-  CephContext *cct;
-  Infiniband* infiniband;
-  RDMADispatcher* dispatcher;
-  RDMAWorker* worker;
-  Device *ibdev = nullptr;
-  int ibport = -1;
-  QueuePair *qp = nullptr;
-
  public:
   typedef Infiniband::MemoryManager::Chunk Chunk;
   typedef Infiniband::CompletionChannel CompletionChannel;
   typedef Infiniband::CompletionQueue CompletionQueue;
 
  private:
-  RDMAConnMgr *cmgr;
+  CephContext *cct;
+  Infiniband::QueuePair *qp;
+  Device *ibdev;
+  int ibport;
+  IBSYNMsg peer_msg;
+  IBSYNMsg my_msg;
+  int connected;
   int error;
+  Infiniband* infiniband;
+  RDMADispatcher* dispatcher;
+  RDMAWorker* worker;
   std::vector<Chunk*> buffers;
   int notify_fd = -1;
   bufferlist pending_bl;
 
   Mutex lock;
   std::vector<ibv_wc> wc;
+  bool is_server;
+  EventCallbackRef con_handler;
+  int tcp_fd = -1;
+  bool active;// qp is active ?
 
+  void notify();
   ssize_t read_buffers(char* buf, size_t len);
   int post_work_request(std::vector<Chunk*>&);
 
  public:
-  uint32_t local_qpn = 0;
-  uint32_t remote_qpn = 0;
-
   RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
                           RDMAWorker *w);
   virtual ~RDMAConnectedSocketImpl();
 
-  ostream &print(ostream &out) const {
-    return out << "socket {lqpn: " << local_qpn << " rqpn: " << remote_qpn << " " << *cmgr << "}";
-  };
-
-  Device *get_device() { return ibdev; };
-  int get_ibport() { return ibport; };
+  Device *get_device() { return ibdev; }
 
   void pass_wc(std::vector<ibv_wc> &&v);
   void get_wc(std::vector<ibv_wc> &w);
-  virtual int is_connected() override { return cmgr->connected; }
+  virtual int is_connected() override { return connected; }
 
   virtual ssize_t read(char* buf, size_t len) override;
   virtual ssize_t zero_copy_read(bufferptr &data) override;
   virtual ssize_t send(bufferlist &bl, bool more) override;
-  virtual void shutdown() override { cmgr->shutdown(); };
-  virtual void close() override { cmgr->close(); };
+  virtual void shutdown() override;
+  virtual void close() override;
   virtual int fd() const override { return notify_fd; }
   void fault();
   const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
-  QueuePair *get_qp() { return qp; };
   ssize_t submit(bool more);
+  int activate();
   void fin();
-  void register_qp(QueuePair *qp);
-  void notify();
-
-  QueuePair *create_queue_pair(Device *d, int p);
-  void set_accept_fd(int sd) {cmgr->set_accept_fd(sd); };
-  int try_connect(const entity_addr_t &sa, const SocketOptions &opt) { return cmgr->try_connect(sa, opt); };
+  void handle_connection();
+  void cleanup();
+  void set_accept_fd(int sd);
+  int try_connect(const entity_addr_t&, const SocketOptions &opt);
+
+  class C_handle_connection : public EventCallback {
+    RDMAConnectedSocketImpl *csi;
+    bool active;
+   public:
+    C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
+    void do_request(int fd) {
+      if (active)
+        csi->handle_connection();
+    }
+    void close() {
+      active = false;
+    }
+  };
 };
-inline ostream& operator<<(ostream& out, const RDMAConnectedSocketImpl &s)
-{
-  return s.print(out);
-}
-
 
 class RDMAServerSocketImpl : public ServerSocketImpl {
- protected:
   CephContext *cct;
   Device *ibdev;
   int ibport;
+  NetHandler net;
+  int server_setup_socket;
   Infiniband* infiniband;
   RDMADispatcher *dispatcher;
   RDMAWorker *worker;
@@ -150,10 +115,11 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
  public:
   RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
 
-  virtual int listen(entity_addr_t &sa, const SocketOptions &opt) = 0;
-  virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) = 0;
-  virtual void abort_accept() = 0;
-  virtual int fd() const = 0;
+  int listen(entity_addr_t &sa, const SocketOptions &opt);
+  virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
+  virtual void abort_accept() override;
+  virtual int fd() const override { return server_setup_socket; }
+  int get_fd() { return server_setup_socket; }
 };
 
 #endif
index d7065baeb12bc1a726c32e5362789e9170c30704..43536d4f3b5984d74c0078ea2381eac5fdae9fe5 100644 (file)
 #include "msg/async/net_handler.h"
 #include "RDMAStack.h"
 #include "Device.h"
-#include "RDMAConnTCP.h"
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
 #define dout_prefix *_dout << " RDMAServerSocketImpl "
 
 RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
-  : cct(cct), infiniband(i), dispatcher(s), worker(w), sa(a)
-{
-}
-
-RDMAServerConnTCP::RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
-  : RDMAServerSocketImpl(cct, i, s, w, a), net(cct), server_setup_socket(-1)
+  : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a)
 {
   ibdev = infiniband->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
   ibport = cct->_conf->ms_async_rdma_port_num;
@@ -40,7 +34,7 @@ RDMAServerConnTCP::RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispat
   ibdev->init(ibport);
 }
 
-int RDMAServerConnTCP::listen(entity_addr_t &sa, const SocketOptions &opt)
+int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
 {
   int rc = 0;
   server_setup_socket = net.create_socket(sa.get_family(), true);
@@ -86,7 +80,7 @@ err:
   return -errno;
 }
 
-int RDMAServerConnTCP::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)
+int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)
 {
   ldout(cct, 15) << __func__ << dendl;
 
@@ -116,7 +110,7 @@ int RDMAServerConnTCP::accept(ConnectedSocket *sock, const SocketOptions &opt, e
   out->set_sockaddr((sockaddr*)&ss);
   net.set_priority(sd, opt.priority, out->get_family());
 
-  RDMAConnectedSocketImpl *server;
+  RDMAConnectedSocketImplserver;
   //Worker* w = dispatcher->get_stack()->get_worker();
   server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w));
   server->set_accept_fd(sd);
@@ -127,7 +121,7 @@ int RDMAServerConnTCP::accept(ConnectedSocket *sock, const SocketOptions &opt, e
   return 0;
 }
 
-void RDMAServerConnTCP::abort_accept()
+void RDMAServerSocketImpl::abort_accept()
 {
   if (server_setup_socket >= 0)
     ::close(server_setup_socket);
index 03d7c1f1c3e2b851d660bc5e69f029f3411826c4..ad49b7d2b79e940e587458c5a2b11caef5dee34f 100644 (file)
@@ -21,7 +21,6 @@
 #include "common/deleter.h"
 #include "common/Tub.h"
 #include "RDMAStack.h"
-#include "RDMAConnTCP.h"
 #include "Device.h"
 
 #define dout_subsys ceph_subsys_ms
@@ -414,7 +413,7 @@ void RDMAWorker::initialize()
 
 int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
 {
-  auto p = new RDMAServerConnTCP(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
+  auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
   int r = p->listen(sa, opt);
   if (r < 0) {
     delete p;