]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: use RDMADispatch to poll rx channel
authorHaomai Wang <haomai@xsky.com>
Thu, 13 Oct 2016 11:20:30 +0000 (19:20 +0800)
committerHaomai Wang <haomai@xsky.com>
Tue, 1 Nov 2016 07:34:04 +0000 (15:34 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAServerSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index fb34fecf9a75625ba80d03dc092b1e871869d8b8..eac08c6d290d32cbe1b8613052cbe83711699c03 100644 (file)
@@ -17,7 +17,6 @@
 #include "Infiniband.h"\r
 #include "common/errno.h"\r
 #include "common/debug.h"\r
-#include "RDMAStack.h"\r
 \r
 #define dout_subsys ceph_subsys_ms\r
 #undef dout_prefix\r
@@ -61,7 +60,7 @@ Device::Device(CephContext *c, ibv_device* d): cct(c), device(d), device_attr(ne
   }\r
 }\r
 \r
-Infiniband::Infiniband(RDMAStack* s, CephContext *c, const std::string &device_name): cct(c), device_list(c), net(c), stack(s)\r
+Infiniband::Infiniband(CephContext *c, const std::string &device_name): cct(c), device_list(c), net(c)\r
 {\r
   device = device_list.get_device(device_name.c_str());\r
   assert(device);\r
@@ -124,7 +123,7 @@ ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_s
  *      QueuePair on success or NULL if init fails\r
  * See QueuePair::QueuePair for parameter documentation.\r
  */\r
-Infiniband::QueuePair* Infiniband::create_queue_pair(ibv_qp_type type)\r
+Infiniband::QueuePair* Infiniband::create_queue_pair(CompletionQueue *c, ibv_qp_type type)\r
 {\r
   Infiniband::CompletionChannel* cc = create_comp_channel();\r
   if (!cc)\r
@@ -137,8 +136,7 @@ Infiniband::QueuePair* Infiniband::create_queue_pair(ibv_qp_type type)
     return NULL;\r
   }\r
 \r
-  RDMAWorker* w = static_cast<RDMAWorker*>(stack->get_worker());\r
-  Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, w->get_tx_cq(), cq, max_send_wr, max_recv_wr);\r
+  Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, c, cq, max_send_wr, max_recv_wr);\r
   if (qp->init()) {\r
     delete cc;\r
     delete cq;\r
@@ -208,6 +206,39 @@ int Infiniband::QueuePair::init()
   return 0;\r
 }\r
 \r
+/**\r
+ * Change RC QueuePair into the ERROR state. This is necessary modify\r
+ * the Queue Pair into the Error state and poll all of the relevant\r
+ * Work Completions prior to destroying a Queue Pair.\r
+ * Since destroying a Queue Pair does not guarantee that its Work\r
+ * Completions are removed from the CQ upon destruction. Even if the\r
+ * Work Completions are already in the CQ, it might not be possible to\r
+ * retrieve them. If the Queue Pair is associated with an SRQ, it is\r
+ * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED\r
+ *\r
+ * \return\r
+ *      -errno if the QueuePair can't switch to ERROR\r
+ *      0 for success.\r
+ */\r
+int Infiniband::QueuePair::to_dead()\r
+{\r
+  if (dead)\r
+    return 0;\r
+  ibv_qp_attr qpa;\r
+  memset(&qpa, 0, sizeof(qpa));\r
+  qpa.qp_state = IBV_QPS_ERR;\r
+\r
+  int mask = IBV_QP_STATE;\r
+  int ret = ibv_modify_qp(qp, &qpa, mask);\r
+  if (ret) {\r
+    lderr(infiniband.cct) << __func__ << " failed to transition to ERROR state: "\r
+                           << cpp_strerror(errno) << dendl;\r
+    return -errno;\r
+  }\r
+  dead = true;\r
+  return ret;\r
+}\r
+\r
 int Infiniband::post_chunk(Chunk* chunk)\r
 {\r
   ibv_sge isge;\r
@@ -284,7 +315,8 @@ Infiniband::QueuePair::QueuePair(
   initial_psn(0),\r
   max_send_wr(max_send_wr),\r
   max_recv_wr(max_recv_wr),\r
-  q_key(q_key)\r
+  q_key(q_key),\r
+  dead(false)\r
 {\r
   initial_psn = lrand48() & 0xffffff;\r
   if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {\r
index 22dee795de44ac9058214f12f697f3347afe86a5..3ed5f85d39456b2bdbbe0891df1338a4306f6c89 100644 (file)
@@ -284,10 +284,9 @@ class Infiniband {
         return 0;\r
       }\r
 \r
-      int take_back(Chunk* ck) {\r
+      void take_back(Chunk* ck) {\r
         Mutex::Locker l(lock);\r
         free_chunks.push_back(ck);\r
-        return 0;\r
       }\r
 \r
       int get_buffers(std::vector<Chunk*> &chunks, size_t bytes) {\r
@@ -312,7 +311,7 @@ class Infiniband {
       uint32_t chunk_size;\r
       Mutex lock;\r
       std::vector<Chunk*> free_chunks;\r
-      set<Chunk*> all_chunks;\r
+      std::set<Chunk*> all_chunks;\r
       char* base;\r
     };\r
 \r
@@ -357,9 +356,11 @@ class Infiniband {
       send = new Cluster(*this, size);\r
       send->add(tx_num);\r
     }\r
-    int return_tx(Chunk* c) {\r
-      c->clear();\r
-      return send->take_back(c);\r
+    void return_tx(std::vector<Chunk*> &chunks) {\r
+      for (auto c : chunks) {\r
+        c->clear();\r
+        send->take_back(c);\r
+      }\r
     }\r
 \r
     int get_send_buffers(std::vector<Chunk*> &c, size_t bytes) {\r
@@ -396,8 +397,7 @@ class Infiniband {
 \r
  public:\r
   NetHandler net;\r
-  RDMAStack* stack;\r
-  explicit Infiniband(RDMAStack* s, CephContext *c, const std::string &device_name);\r
+  explicit Infiniband(CephContext *c, const std::string &device_name);\r
 \r
   /**\r
    * Destroy an Infiniband object.\r
@@ -459,11 +459,6 @@ class Infiniband {
   class QueuePair {\r
    public:\r
     QueuePair(Infiniband& infiniband, ibv_qp_type type,int ib_physical_port,  ibv_srq *srq, Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0);\r
-    // exists solely as superclass constructor for MockQueuePair derivative\r
-    explicit QueuePair(Infiniband& infiniband):\r
-      infiniband(infiniband), type(IBV_QPT_RC), ctxt(NULL), ib_physical_port(-1),\r
-      pd(NULL), srq(NULL), qp(NULL), txcq(NULL), rxcq(NULL),\r
-      initial_psn(-1) {}\r
     ~QueuePair();\r
 \r
     int init();\r
@@ -551,9 +546,9 @@ class Infiniband {
     ibv_qp* get_qp() const { return qp; }\r
     Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }\r
     Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }\r
-    int to_reset();\r
     int to_dead();\r
-    int get_fd() { return fd; }\r
+    bool is_dead() const { return dead; }\r
+\r
    private:\r
     Infiniband&  infiniband;     // Infiniband to which this QP belongs\r
     ibv_qp_type  type;           // QP type (IBV_QPT_RC, etc.)\r
@@ -568,13 +563,13 @@ class Infiniband {
     uint32_t     max_send_wr;\r
     uint32_t     max_recv_wr;\r
     uint32_t     q_key;\r
-    int fd;\r
+    bool dead;\r
   };\r
 \r
  public:\r
   typedef MemoryManager::Cluster Cluster;\r
   typedef MemoryManager::Chunk Chunk;\r
-  QueuePair* create_queue_pair(ibv_qp_type type);\r
+  QueuePair* create_queue_pair(CompletionQueue *w, ibv_qp_type type);\r
   ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);\r
   int post_chunk(Chunk* chunk);\r
   int post_channel_cluster();\r
@@ -592,6 +587,7 @@ class Infiniband {
   ibv_gid get_gid() { return device->get_gid(); }\r
   MemoryManager* get_memory_manager() { return memory_manager; }\r
   Device* get_device() { return device; }\r
+  int get_async_fd() { return device->ctxt->async_fd; }\r
   static const char* wc_status_to_string(int status);\r
 };\r
 \r
index 3e93ebaa255305d42775fdc987b006d350438a88..c6594f30824fc81928148d51da5b702586bdec0b 100644 (file)
@@ -101,57 +101,39 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
 {\r
   ldout(cct, 20) << __func__ << " need to read bytes: " << len  << " buffers size: " << buffers.size() << dendl;\r
 \r
+  if (error)\r
+    return -error;\r
   ssize_t read = 0;\r
   if (!buffers.empty())\r
     read = read_buffers(buf,len);\r
 \r
-  static const int MAX_COMPLETIONS = 16;\r
-  ibv_wc wc[MAX_COMPLETIONS];\r
+  std::vector<ibv_wc> cqe;\r
+  get_wc(cqe);\r
+  if (cqe.empty())\r
+    return read == 0 ? -EAGAIN : read;\r
 \r
-  bool rearmed = false;\r
-  int n;\r
- again:\r
-  n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);\r
-  ldout(cct, 20) << __func__ << " poll completion queue got " << n << " responses."<< dendl;\r
-  for (int i = 0; i < n; ++i) {\r
-    ibv_wc* response = &wc[i];\r
-    ldout(cct, 20) << __func__ << " cqe  " << response->byte_len << " bytes." << dendl;\r
+  ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses."<< dendl;\r
+  for (size_t i = 0; i < cqe.size(); ++i) {\r
+    ibv_wc* response = &cqe[i];\r
+    assert(response->status == IBV_WC_SUCCESS);\r
+    ldout(cct, 20) << __func__ << " cqe " << response->byte_len << " bytes." << dendl;\r
     Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);\r
     chunk->prepare_read(response->byte_len);\r
-    if (!response->byte_len) {\r
-      wait_close = true;\r
-      return 0;\r
-    }\r
-    if (response->status != IBV_WC_SUCCESS) {\r
-      lderr(cct) << __func__ << " poll cqe failed! " << " number: " << n << ", status: "<< response->status << cpp_strerror(errno) << dendl;\r
-      assert(0);\r
+    assert(!response->byte_len);\r
+    if (read == (ssize_t)len) {\r
+      buffers.push_back(chunk);\r
+      ldout(cct, 20) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;\r
+    } else if (read + response->byte_len > (ssize_t)len) {\r
+      read += chunk->read(buf+read, (ssize_t)len-read);\r
+      buffers.push_back(chunk);\r
+      ldout(cct, 20) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;\r
     } else {\r
-      if (read == (ssize_t)len) {\r
-        buffers.push_back(chunk);\r
-        ldout(cct, 20) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;\r
-      } else if (read + response->byte_len > (ssize_t)len) {\r
-        read += chunk->read(buf+read, (ssize_t)len-read);\r
-        buffers.push_back(chunk);\r
-        ldout(cct, 20) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;\r
-      } else {\r
-        read += chunk->read(buf+read, response->byte_len);\r
-        assert(infiniband->post_chunk(chunk) == 0);\r
-      }\r
+      read += chunk->read(buf+read, response->byte_len);\r
+      assert(infiniband->post_chunk(chunk) == 0);\r
     }\r
   }\r
 \r
-  if (n)\r
-   goto again;\r
-  if (!rearmed) {\r
-     rx_cq->rearm_notify();\r
-     rearmed = true;\r
-     // Clean up cq events after rearm notify ensure no new incoming event\r
-     // arrived between polling and rearm\r
-     goto again;\r
-  }\r
-  if (read == 0)\r
-    return -EAGAIN;\r
-  return read;\r
+  return read == 0 ? -EAGAIN : read;\r
 }\r
 \r
 ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)\r
@@ -180,57 +162,44 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
 \r
 ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)\r
 {\r
-  ssize_t size = 0;\r
+  if (error)\r
+    return -error;\r
   static const int MAX_COMPLETIONS = 16;\r
   ibv_wc wc[MAX_COMPLETIONS];\r
-\r
-  bool rearmed = false;\r
-  int n;\r
- again:\r
-  n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);\r
-  ldout(cct, 20) << __func__ << " pool completion queue got " << n << " responses."<< dendl;\r
+  ssize_t size;\r
 \r
   ibv_wc*  response;\r
   Chunk* chunk;\r
   bool loaded = false;\r
   auto iter = buffers.begin();\r
-  if(iter != buffers.end()) {\r
+  if (iter != buffers.end()) {\r
     chunk = *iter;\r
-    if (chunk->bound == 0) {\r
-      wait_close = true;\r
-      return 0;\r
-    }\r
     auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
     buffers.erase(iter);\r
     loaded = true;\r
     size = chunk->bound;\r
   }\r
 \r
-  for (int i = 0; i < n; ++i) {\r
+  std::vector<ibv_wc> cqe;\r
+  get_wc(cqe);\r
+  if (cqe.empty())\r
+    return size == 0 ? -EAGAIN : size;\r
+\r
+  ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;\r
+\r
+  for (size_t i = 0; i < cqe.size(); ++i) {\r
     response = &wc[i];\r
     chunk = reinterpret_cast<Chunk*>(response->wr_id);\r
     chunk->prepare_read(response->byte_len);\r
     if(!loaded && i == 0) {\r
-      if (chunk->bound == 0) {\r
-        wait_close = true;\r
-        return 0;\r
-      }\r
       auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
       size = chunk->bound;\r
       continue;\r
     }\r
     buffers.push_back(chunk);\r
+    iter++;\r
   }\r
 \r
-  if (n)\r
-   goto again;\r
-  if (!rearmed) {\r
-     rx_cq->rearm_notify();\r
-     rearmed = true;\r
-     // Clean up cq events after rearm notify ensure no new incoming event\r
-     // arrived between polling and rearm\r
-     goto again;\r
-  }\r
   if (size == 0)\r
     return -EAGAIN;\r
   return size;\r
@@ -238,18 +207,33 @@ ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
 \r
 ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)\r
 {\r
+  if (error)\r
+    return -error;\r
   size_t bytes = bl.length();\r
   if (!bytes)\r
     return 0;\r
-  vector<Chunk*> tx_buffers;\r
-  if (infiniband->get_tx_buffers(tx_buffers, bytes) < 0) {\r
+  pending_bl.claim_append(bl);\r
+  ssize_t r = submit(more);\r
+  if (r < 0 && r != -EAGAIN)\r
+    return r;\r
+  return bytes;\r
+}\r
+\r
+ssize_t RDMAConnectedSocketImpl::submit(bool more)\r
+{\r
+  if (error)\r
+    return -error;\r
+  std::vector<Chunk*> tx_buffers;\r
+  size_t bytes = pending_bl.length();\r
+  if (worker->reserve_message_buffer(this, tx_buffers, bytes) < 0) {\r
     ldout(cct, 10) << __func__ << " no enough buffers" << dendl;\r
-    return 0;\r
+    pending_bl.claim_append(pending_bl);\r
+    return -EAGAIN;\r
   }\r
   ldout(cct, 20) << __func__ << " prepare " << bytes << " bytes, tx buffer count: " << tx_buffers.size() << dendl;\r
   vector<Chunk*>::iterator current_buffer = tx_buffers.begin();\r
-  list<bufferptr>::const_iterator it = bl.buffers().begin();\r
-  while (it != bl.buffers().end()) {\r
+  list<bufferptr>::const_iterator it = pending_bl.buffers().begin();\r
+  while (it != pending_bl.buffers().end()) {\r
     const uintptr_t addr = reinterpret_cast<const uintptr_t>(it->c_str());\r
     uint32_t copied = 0;\r
     //  ldout(cct, 20) << __func__ << " app_buffer: " << addr << " length:  " << it->length()  << dendl;\r
@@ -270,7 +254,7 @@ ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
     return r;\r
 \r
   ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;\r
-  bl.clear();\r
+  pending_bl.clear();\r
   return bytes;\r
 }\r
 \r
@@ -318,22 +302,3 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
   }\r
   return 0;\r
 }\r
-\r
-void RDMAConnectedSocketImpl::fin() {\r
-  //ibv_sge list;\r
-  //memset(&list, 0, sizeof(list));\r
-  ibv_send_wr wr;\r
-  memset(&wr, 0, sizeof(wr));\r
-  wr.wr_id = reinterpret_cast<uint64_t>(this);\r
-  wr.num_sge = 0;\r
-  //wr.sg_list = &list;\r
-  wr.opcode = IBV_WR_SEND;\r
-  wr.send_flags = IBV_SEND_SIGNALED;\r
-  ibv_send_wr* bad_tx_work_request;\r
-  if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {\r
-    lderr(cct) << __func__ << " failed to send FIN"\r
-               << "(most probably should be peer not ready): "\r
-               << cpp_strerror(errno) << dendl;\r
-    return ;\r
-  }\r
-}\r
index 0a148f9f6e7ba3a3618ef72eba77bf51093bb7d8..6f0c7de8e60f1c3b641a3930dba37b395f0d2327 100644 (file)
@@ -78,8 +78,7 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt
       ldout(cct, 1) << __func__ << " recv msg not whole." << dendl;\r
       continue;\r
     } else {\r
-      //RDMAWorker* w = static_cast<RDMAWorker*>(infiniband->stack->get_worker());\r
-      server = new RDMAConnectedSocketImpl(cct, infiniband, NULL, msg);\r
+      server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, worker, msg);\r
       msg = server->get_my_msg();\r
       r = infiniband->send_udp_msg(server_setup_socket, msg, addr);\r
       server->activate();\r
index ca92fae38743de6f8b9de89acef455812416c050..104fa3bf1700ba41ce27b084a2786b7d66e1c49d 100644 (file)
@@ -14,8 +14,8 @@
  *
  */
 
-#include "RDMAStack.h"
 #include "include/str_list.h"
+#include "RDMAStack.h"
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
 
 static Infiniband* global_infiniband;
 
+RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
+  : Worker(c, i), stack(nullptr), infiniband(NULL),
+    tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL)
+{}
+
 int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
 {
-  auto p = new RDMAServerSocketImpl(cct, infiniband, sa);
+  auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa);
   int r = p->listen(sa, opt);
   if (r < 0) {
     delete p;
@@ -38,7 +43,7 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket
 
 int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
 {
-  RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, this);
+  RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this);
   entity_addr_t sa;
   memcpy(&sa, &addr, sizeof(addr));
 
@@ -46,7 +51,7 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
   ldout(cct, 20) << __func__ << " connecting to " << sa.get_sockaddr() << " : " << sa.get_port() << dendl;
   ldout(cct, 20) << __func__ << " my syn msg :  < " << msg.qpn << ", " << msg.psn <<  ", " << msg.lid << ">"<< dendl;
 
-  client_setup_socket = ::socket(PF_INET, SOCK_DGRAM, 0);
+  int client_setup_socket = ::socket(PF_INET, SOCK_DGRAM, 0);
   if (client_setup_socket == -1) {
     lderr(cct) << __func__ << " failed to create client socket: " << strerror(errno) << dendl;
     return -errno;
@@ -77,6 +82,7 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
   assert(!r);
   std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
   *socket = ConnectedSocket(std::move(csi));
+  ::close(client_setup_socket);
 
   return 0;
 }
@@ -85,7 +91,8 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
 {
   if (!global_infiniband)
     global_infiniband = new Infiniband(
-        this, cct, cct->_conf->ms_async_rdma_device_name);
+        cct, cct->_conf->ms_async_rdma_device_name);
+  dispatcher = new RDMADispatcher(cct, global_infiniband);
 }
 
 void RDMAWorker::initialize()
@@ -97,6 +104,45 @@ void RDMAWorker::initialize()
   memory_manager = infiniband->get_memory_manager();
 }
 
+int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
+{
+  int r = infiniband->get_tx_buffers(c, bytes);
+  if (r == 0) {
+    stack->get_dispatcher()->inflight += c.size();
+    return r;
+  }
+
+  if (pending_sent_conns.back() != o)
+    pending_sent_conns.push_back(o);
+  return 0;
+}
+
+/**
+ * Add the given Chunks to the given free queue.
+ *
+ * \param[in] chunks
+ *      The Chunks to enqueue.
+ * \return
+ *      0 if success or -1 for failure
+ */
+int RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
+{
+  stack->get_dispatcher()->inflight -= chunks.size();
+  infiniband->get_memory_manager()->return_tx(chunks);
+  while (!pending_sent_conns.empty()) {
+    RDMAConnectedSocketImpl *o = pending_sent_conns.front();
+    ssize_t r = o->submit(false);
+    ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
+    if (r < 0) {
+      if (r == -EAGAIN)
+        break;
+      o->fault();
+    }
+    pending_sent_conns.pop_front();
+  }
+  return 0;
+}
+
 void RDMAWorker::handle_tx_event()
 {
   ldout(cct, 20) << __func__ << dendl;
@@ -105,6 +151,8 @@ void RDMAWorker::handle_tx_event()
 
   static const int MAX_COMPLETIONS = 16;
   ibv_wc wc[MAX_COMPLETIONS];
+  std::vector<Chunk*> tx_chunks;
+  tx_chunks.reserve(MAX_COMPLETIONS);
 
   bool rearmed = false;
   int n;
@@ -129,17 +177,23 @@ void RDMAWorker::handle_tx_event()
                                << response->wr_id << ") status(" << response->status << "): "
                                << infiniband->wc_status_to_string(response->status) << dendl;
       }
-      assert(0);
+      RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num);
+      if (conn) {
+        conn->fault();
+      } else {
+        ldout(cct, 0) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
+      }
     }
 
-    if (memory_manager->is_tx_chunk(chunk))
-      infiniband->get_memory_manager()->return_tx(chunk);
-    else
-      ldout(cct, 20) << __func__ << " chunk belongs to none " << dendl;
+    assert(memory_manager->is_tx_chunk(chunk));
+    tx_chunks.push_back(chunk);
   }
 
-  if (n)
+  if (n) {
+    post_tx_buffer(tx_chunks);
+    tx_chunks.clear();
     goto again;
+  }
 
   if (!rearmed) {
     tx_cq->rearm_notify();
@@ -148,5 +202,113 @@ void RDMAWorker::handle_tx_event()
     // arrived between polling and rearm
     goto again;
   }
+
   ldout(cct, 20) << __func__ << " leaving handle_tx_event. " << dendl;
 }
+
+RDMADispatcher::~RDMADispatcher()
+{
+  done = true;
+  t.join();
+  assert(qp_conns.empty());
+  while (!dead_queue_pairs.empty()) {
+    delete dead_queue_pairs.back();
+    dead_queue_pairs.pop_back();
+  }
+
+  rx_cc->ack_events();
+  delete rx_cq;
+  delete rx_cc;
+  delete async_handler;
+}
+
+void RDMADispatcher::handle_async_event()
+{
+  ldout(cct, 20) << __func__ << dendl;
+  while (1) {
+    ibv_async_event async_event;
+    if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
+      if (errno != EAGAIN)
+       lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
+                  << " " << cpp_strerror(errno) << ")" << dendl;
+      return;
+    }
+    // FIXME: Currently we must ensure no other factor make QP in ERROR state,
+    // otherwise this qp can't be deleted in current cleanup flow.
+    if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
+      uint64_t qpn = async_event.element.qp->qp_num;
+      ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
+                     << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
+      RDMAConnectedSocketImpl *conn = get_conn_by_qp(qpn);
+      if (!conn) {
+        ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
+      } else {
+        ldout(cct, 0) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
+        conn->fault();
+        erase_qpn(qpn);
+      }
+    } else {
+      ldout(cct, 0) << __func__ << " ibv_get_async_event: dev=" << ib->get_device()->ctxt
+                    << " evt: " << ibv_event_type_str(async_event.event_type)
+                    << dendl;
+    }
+    ibv_ack_async_event(&async_event);
+  }
+}
+
+
+void RDMADispatcher::polling()
+{
+  static int MAX_COMPLETIONS = 32;
+  ibv_wc wc[MAX_COMPLETIONS];
+
+  std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
+  int i, n;
+  while (!done) {
+    n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
+    if (!n) {
+      // NOTE: Has TX just transitioned to idle? We should do it when idle!
+      // It's now safe to delete queue pairs (see comment by declaration
+      // for dead_queue_pairs).
+      // Additionally, don't delete qp while outstanding_buffers isn't empty,
+      // because we need to check qp's state before sending
+      if (!inflight.load()) {
+        while (!dead_queue_pairs.empty()) {
+          ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.back() << dendl;
+          delete dead_queue_pairs.back();
+          dead_queue_pairs.pop_back();
+        }
+      }
+      handle_async_event();
+      continue;
+    }
+
+    ldout(cct, 20) << __func__ << " pool completion queue got " << n
+                   << " responses."<< dendl;
+    for (i = 0; i < n; ++i) {
+      ibv_wc* response = &wc[i];
+      Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+      ldout(cct, 20) << __func__ << " got chunk=" << response->wr_id << " qp:" << wc[i].qp_num << dendl;
+
+      if (response->status != IBV_WC_SUCCESS) {
+        lderr(cct) << __func__ << " work request returned error for buffer(" << response->wr_id
+                   << ") status(" << response->status << ":"
+                   << ib->wc_status_to_string(response->status) << dendl;
+        ib->post_chunk(chunk);
+        continue;
+      }
+
+      RDMAConnectedSocketImpl *conn = get_conn_by_qp(response->qp_num);
+      if (!conn) {
+        // discard buffer
+        ldout(cct, 0) << __func__ << " missing qp_num " << response->qp_num << ", discard bd "
+                      << chunk << dendl;
+        continue;
+      }
+      polled[conn].push_back(*response);
+    }
+    for (auto &&i : polled)
+      i.first->pass_wc(std::move(i.second));
+    polled.clear();
+  }
+}
index 2ff1c0d7929666f8e7354bcf9110334e47d8277d..a1f8158585194d30265c64c9716854f1bd0b2da2 100644 (file)
 #ifndef CEPH_MSG_RDMASTACK_H
 #define CEPH_MSG_RDMASTACK_H
 
+#include <sys/eventfd.h>
+
+#include <list>
+#include <vector>
+#include <thread>
+
 #include "common/ceph_context.h"
 #include "common/debug.h"
 #include "common/errno.h"
 #include "msg/async/Stack.h"
-#include <thread>
 #include "Infiniband.h"
 
 class RDMAConnectedSocketImpl;
+class RDMAStack;
+
+class RDMADispatcher {
+  typedef Infiniband::MemoryManager::Chunk Chunk;
+  typedef Infiniband::QueuePair QueuePair;
+
+  std::thread t;
+  CephContext *cct;
+  Infiniband* ib;
+  Infiniband::CompletionQueue* rx_cq;           // common completion queue for all transmits
+  Infiniband::CompletionChannel* rx_cc;
+  EventCallbackRef async_handler;
+  bool done = false;
+  Mutex lock; // protect `qp_conns
+  // qp_num -> InfRcConnection
+  // The main usage of `qp_conns` is looking up connection by qp_num,
+  // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
+  //// make qp queue into dead state
+  /**
+   * 1. Connection call mark_down
+   * 2. Move the Queue Pair into the Error state(QueuePair::to_dead)
+   * 3. Wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED(handle_async_event)
+   * 4. Wait for CQ to be empty(handle_tx_event)
+   * 5. Destroy the QP by calling ibv_destroy_qp()(handle_tx_event)
+   *
+   * @param qp The qp needed to dead
+   */
+  ceph::unordered_map<uint32_t, std::pair<QueuePair*, RDMAConnectedSocketImpl*> > qp_conns;
+
+  /// if a queue pair is closed when transmit buffers are active
+  /// on it, the transmit buffers never get returned via tx_cq.  To
+  /// work around this problem, don't delete queue pairs immediately. Instead,
+  /// save them in this vector and delete them at a safe time, when there are
+  /// no outstanding transmit buffers to be lost.
+  std::vector<QueuePair*> dead_queue_pairs;
+
+  class C_handle_cq_async : public EventCallback {
+    RDMADispatcher *dispatcher;
+   public:
+    C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {}
+    void do_request(int fd) {
+      // worker->handle_tx_event();
+      dispatcher->handle_async_event();
+    }
+  };
+
+ public:
+  std::atomic_ulong inflight;
+  explicit RDMADispatcher(CephContext* cct, Infiniband* i)
+    : t(&RDMADispatcher::polling, this), cct(cct), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock") {
+    rx_cc = ib->create_comp_channel();
+    assert(rx_cc);
+    rx_cq = ib->create_comp_queue(rx_cc);
+    assert(rx_cq);
+  }
+  virtual ~RDMADispatcher();
+
+  void handle_async_event();
+  void polling();
+  int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) {
+    int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+    assert(fd >= 0);
+    Mutex::Locker l(lock);
+    assert(!qp_conns.count(qp->get_local_qp_number()));
+    qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
+    return fd;
+  }
+  RDMAConnectedSocketImpl* get_conn_by_qp(uint32_t qp) {
+    Mutex::Locker l(lock);
+    auto it = qp_conns.find(qp);
+    if (it == qp_conns.end())
+      return NULL;
+    if (it->second.first->is_dead())
+      return NULL;
+    return it->second.second;
+  }
+  void erase_qpn(uint32_t qpn) {
+    Mutex::Locker l(lock);
+    auto it = qp_conns.find(qpn);
+    if (it == qp_conns.end())
+      return ;
+    dead_queue_pairs.push_back(it->second.first);
+    qp_conns.erase(it);
+  }
+};
+
 
 class RDMAWorker : public Worker {
   typedef Infiniband::CompletionQueue CompletionQueue;
   typedef Infiniband::CompletionChannel CompletionChannel;
   typedef Infiniband::MemoryManager::Chunk Chunk;
   typedef Infiniband::MemoryManager MemoryManager;
-  int client_setup_socket;
-  Infinibandinfiniband;
-  CompletionQueuetx_cq;           // common completion queue for all transmits
-  CompletionChanneltx_cc;
+  RDMAStack *stack;
+  Infiniband *infiniband;
+  CompletionQueue *tx_cq;           // common completion queue for all transmits
+  CompletionChannel *tx_cc;
   EventCallbackRef tx_handler;
-  MemoryManager* memory_manager;
-  vector<RDMAConnectedSocketImpl*> to_delete;
+  MemoryManager *memory_manager;
+  std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
+
   class C_handle_cq_tx : public EventCallback {
     RDMAWorker *worker;
     public:
@@ -48,8 +140,7 @@ class RDMAWorker : public Worker {
   };
 
  public:
-  explicit RDMAWorker(CephContext *c, unsigned i)
-    : Worker(c, i), infiniband(NULL), tx_handler(new C_handle_cq_tx(this)) {}
+  explicit RDMAWorker(CephContext *c, unsigned i);
   virtual ~RDMAWorker() {
     tx_cc->ack_events();
     delete tx_cq;
@@ -60,21 +151,16 @@ class RDMAWorker : public Worker {
   virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
   virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
   virtual void initialize() override;
-  void handle_tx_event();
   CompletionQueue* get_tx_cq() { return tx_cq; }
-  void remove_to_delete(RDMAConnectedSocketImpl* csi) {
-    if (to_delete.empty())
-      return ;
-    vector<RDMAConnectedSocketImpl*>::iterator iter = to_delete.begin();
-    for (; iter != to_delete.end(); ++iter) {
-      if(csi == *iter) {
-        to_delete.erase(iter);
-      }
-    }
+  RDMAStack *get_stack() {
+    return stack;
   }
-  void add_to_delete(RDMAConnectedSocketImpl* csi) {
-    to_delete.push_back(csi);
+  int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
+  int post_tx_buffer(std::vector<Chunk*> &chunks);
+  void remove_pending_conn(RDMAConnectedSocketImpl *o) {
+    pending_sent_conns.remove(o);
   }
+  void handle_tx_event();
 };
 
 class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
@@ -89,24 +175,54 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
   IBSYNMsg peer_msg;
   IBSYNMsg my_msg;
   int connected;
+  int error;
   Infiniband* infiniband;
+  RDMADispatcher* dispatcher;
   RDMAWorker* worker;
   std::vector<Chunk*> buffers;
-  CompletionChannel* rx_cc;
-  CompletionQueue* rx_cq;
-  bool wait_close;
+  int notify_fd;
+  bufferlist pending_bl;
+
+  Mutex lock;
+  std::vector<ibv_wc> wc;
+
+  void notify() {
+    uint64_t i = 1;
+    assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
+  }
+  ssize_t read_buffers(char* buf, size_t len);
+  int post_work_request(std::vector<Chunk*>&);
 
  public:
-  RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMAWorker* w, IBSYNMsg im = IBSYNMsg()) : cct(cct), peer_msg(im), infiniband(ib), worker(w), wait_close(false) {
-    qp = infiniband->create_queue_pair(IBV_QPT_RC);
-    rx_cq = qp->get_rx_cq();
-    rx_cc = rx_cq->get_cc();
+  RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+                          RDMAWorker *w, IBSYNMsg im = IBSYNMsg())
+    : cct(cct), peer_msg(im), connected(0), error(0), infiniband(ib),
+      dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock") {
+    qp = infiniband->create_queue_pair(w->get_tx_cq(), IBV_QPT_RC);
     my_msg.qpn = qp->get_local_qp_number();
     my_msg.psn = qp->get_initial_psn();
     my_msg.lid = infiniband->get_lid();
     my_msg.gid = infiniband->get_gid();
+    notify_fd = dispatcher->register_qp(qp, this);
+  }
+  virtual ~RDMAConnectedSocketImpl() {
+    worker->remove_pending_conn(this);
   }
 
+  void pass_wc(std::vector<ibv_wc> &&v) {
+    Mutex::Locker l(lock);
+    if (wc.empty())
+      wc = std::move(v);
+    else
+      wc.insert(wc.end(), v.begin(), v.end());
+    notify();
+  }
+  void get_wc(std::vector<ibv_wc> &w) {
+    Mutex::Locker l(lock);
+    if (wc.empty())
+      return ;
+    w.swap(wc);
+  }
   virtual int is_connected() override {
     return connected;
   }
@@ -114,34 +230,32 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
   virtual ssize_t zero_copy_read(bufferptr &data) override;
   virtual ssize_t send(bufferlist &bl, bool more) override;
   virtual void shutdown() override {
+    if (qp) {
+      qp->to_dead();
+      qp = NULL;
+    }
   }
   virtual void close() override {
-    if (!wait_close) {
-      fin();
-      worker->add_to_delete(this);
-    } else {
-      clear_all();
+    if (qp) {
+      qp->to_dead();
+      qp = NULL;
     }
   }
   virtual int fd() const override {
-    return rx_cc->get_fd();
+    return notify_fd;
   }
-  void clear_all() {
-    delete qp;
-    rx_cc->ack_events();
-    delete rx_cq;
-    rx_cq = NULL;
-    if (!wait_close)
-      worker->remove_to_delete(this);
+  void fault() {
+    if (qp) {
+      qp->to_dead();
+      qp = NULL;
+    }
+    error = ECONNRESET;
+    notify();
   }
+  ssize_t submit(bool more);
   int activate();
-  ssize_t read_buffers(char* buf, size_t len);
-  int poll_cq(int num_entries, ibv_wc *ret_wc_array);
   IBSYNMsg get_my_msg() { return my_msg; }
-  IBSYNMsg get_peer_msg() { return peer_msg; }
   void set_peer_msg(IBSYNMsg m) { peer_msg = m ;}
-  int post_work_request(std::vector<Chunk*>&);
-  void fin();
 };
 
 
@@ -150,13 +264,19 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
   NetHandler net;
   int server_setup_socket;
   Infiniband* infiniband;
+  RDMADispatcher *dispatcher;
+  RDMAWorker *worker;
   entity_addr_t sa;
+
  public:
-  RDMAServerSocketImpl(CephContext *cct, Infiniband* i, entity_addr_t& a)
-    : cct(cct), net(cct), infiniband(i), sa(a) {}
+  RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
+    : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a) {}
   int listen(entity_addr_t &sa, const SocketOptions &opt);
   virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out) override;
-  virtual void abort_accept() override {}
+  virtual void abort_accept() override {
+    if (server_setup_socket >= 0)
+      ::close(server_setup_socket);
+  }
   virtual int fd() const override {
     return server_setup_socket;
   }
@@ -165,9 +285,11 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
 
 class RDMAStack : public NetworkStack {
   vector<std::thread> threads;
+  RDMADispatcher *dispatcher;
 
  public:
   explicit RDMAStack(CephContext *cct, const string &t);
+  virtual ~RDMAStack() { delete dispatcher; }
   virtual bool support_zero_copy_read() const override { return true; }
   //virtual bool support_local_listen_table() const { return true; }
 
@@ -179,6 +301,7 @@ class RDMAStack : public NetworkStack {
     assert(threads.size() > i && threads[i].joinable());
     threads[i].join();
   }
+  RDMADispatcher *get_dispatcher() { return dispatcher; }
 };
 
 #endif