]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async/rdma: support qp that isn't associated with SRQ
authorhaodong tang <haodong.tang@intel.com>
Wed, 4 Apr 2018 05:43:56 +0000 (13:43 +0800)
committerhaodong <haodong.tang@intel.com>
Wed, 20 Jun 2018 06:08:26 +0000 (14:08 +0800)
Signed-off-by: Haodong Tang <haodong.tang@intel.com>
src/common/legacy_config_opts.h
src/common/options.cc
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index 02638506b6921360903ca22e4d7fe25ffaee29f7..38320ff4ea2b8eb671096f3874fff38640ab0c38 100644 (file)
@@ -161,6 +161,8 @@ OPTION(ms_async_rdma_send_buffers, OPT_U32)
 OPTION(ms_async_rdma_receive_buffers, OPT_U32)
 // max number of wr in srq
 OPTION(ms_async_rdma_receive_queue_len, OPT_U32)
+// support srq
+OPTION(ms_async_rdma_support_srq, OPT_BOOL)
 OPTION(ms_async_rdma_port_num, OPT_U32)
 OPTION(ms_async_rdma_polling_us, OPT_U32)
 OPTION(ms_async_rdma_local_gid, OPT_STR)       // GID format: "fe80:0000:0000:0000:7efe:90ff:fe72:6efe", no zero folding
index b8ed5e79be62601bea04f764887476bf7d9abf00..2ed343a69726448b587f6174e4ca90fe8037ced1 100644 (file)
@@ -1024,6 +1024,10 @@ std::vector<Option> get_global_options() {
     .set_default(4096)
     .set_description(""),
 
+    Option("ms_async_rdma_support_srq", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
+    .set_default(true)
+    .set_description(""),
+
     Option("ms_async_rdma_port_num", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
     .set_default(1)
     .set_description(""),
index cfbd3680364c8e69cee67d47465e9c413c5e5f81..3458bd2b53815bcbfe0beb503ce20df4d8bac5f4 100644 (file)
@@ -189,7 +189,12 @@ int Infiniband::QueuePair::init()
   memset(&qpia, 0, sizeof(qpia));
   qpia.send_cq = txcq->get_cq();
   qpia.recv_cq = rxcq->get_cq();
-  qpia.srq = srq;                      // use the same shared receive queue
+  if (srq) {
+    qpia.srq = srq;                      // use the same shared receive queue
+  } else {
+    qpia.cap.max_recv_wr = max_recv_wr;
+    qpia.cap.max_recv_sge = 1;
+  }
   qpia.cap.max_send_wr  = max_send_wr; // max outstanding send requests
   qpia.cap.max_send_sge = 1;           // max send scatter-gather elements
   qpia.cap.max_inline_data = MAX_INLINE_DATA;          // max bytes of immediate data on send q
@@ -908,7 +913,11 @@ void Infiniband::init()
   pd = new ProtectionDomain(cct, device);
   assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
 
-  rx_queue_len = device->device_attr->max_srq_wr;
+  support_srq = cct->_conf->ms_async_rdma_support_srq;
+  if (support_srq)
+    rx_queue_len = device->device_attr->max_srq_wr;
+  else
+    rx_queue_len = device->device_attr->max_qp_wr;
   if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) {
     rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len;
     ldout(cct, 1) << __func__ << " receive queue length is " << rx_queue_len << " receive buffers" << dendl;
@@ -941,17 +950,18 @@ void Infiniband::init()
   memory_manager = new MemoryManager(cct, device, pd);
   memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len);
 
-  srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
-
-  post_chunks_to_srq(rx_queue_len); //add to srq
+  if (support_srq) {
+    srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
+    post_chunks_to_rq(rx_queue_len, NULL); //add to srq
+  }
 }
 
 Infiniband::~Infiniband()
 {
   if (!initialized)
     return;
-
-  ibv_destroy_srq(srq);
+  if (support_srq)
+    ibv_destroy_srq(srq);
   delete memory_manager;
   delete pd;
 }
@@ -1003,7 +1013,7 @@ Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, Completio
   return qp;
 }
 
-int Infiniband::post_chunks_to_srq(int num)
+int Infiniband::post_chunks_to_rq(int num, ibv_qp *qp)
 {
   int ret, i = 0;
   ibv_sge isge[num];
@@ -1038,8 +1048,14 @@ int Infiniband::post_chunks_to_srq(int num)
     i++;
   }
   ibv_recv_wr *badworkrequest;
-  ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
-  assert(ret == 0);
+  if (support_srq) {
+    ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
+    assert(ret == 0);
+  } else {
+    assert(qp);
+    ret = ibv_post_recv(qp, &rx_work_request[0], &badworkrequest);
+    assert(ret == 0);
+  }
   return i;
 }
 
index 0a9daf5fefb15f10aaab6ac273311fb192f80172..8b4684bcf4ef744b33a9684d37880afeb8254eb4 100644 (file)
@@ -350,6 +350,7 @@ class Infiniband {
     MemPoolContext rxbuf_pool_ctx;
     mem_pool     rxbuf_pool;
 
+
     void* huge_pages_malloc(size_t size);
     void  huge_pages_free(void *ptr);
   };
@@ -371,6 +372,7 @@ class Infiniband {
   bool initialized = false;
   const std::string &device_name;
   uint8_t port_num;
+  bool support_srq = false;
 
  public:
   explicit Infiniband(CephContext *c);
@@ -504,7 +506,7 @@ class Infiniband {
       ibv_qp_type type, struct rdma_cm_id *cm_id);
   ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
   // post rx buffers to srq, return number of buffers actually posted
-  int  post_chunks_to_srq(int num);
+  int post_chunks_to_rq(int num, ibv_qp *qp=NULL);
   void post_chunk_to_pool(Chunk* chunk) {
     get_memory_manager()->release_rx_buffer(chunk);
   }
@@ -523,6 +525,7 @@ class Infiniband {
   Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
   static const char* wc_status_to_string(int status);
   static const char* qp_state_string(int status);
+  uint32_t get_rx_queue_len() const { return rx_queue_len; }
 };
 
 #endif
index 3680045e8f2dcd6708cae5c68df02ca4ff4998a5..10ff2416c894e58a28aa96640af74562931a0f61 100644 (file)
@@ -317,6 +317,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
       } else {
         read += chunk->read(buf+read, response->byte_len);
         dispatcher->post_chunk_to_pool(chunk);
+        update_post_backlog();
       }
     }
   }
@@ -348,6 +349,7 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
     ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound()  << ". Chunk:" << *c  << dendl;
     if ((*c)->over()) {
       dispatcher->post_chunk_to_pool(*c);
+      update_post_backlog();
       ldout(cct, 25) << __func__ << " one chunk over." << dendl;
     }
     if (read == len) {
@@ -659,3 +661,14 @@ void RDMAConnectedSocketImpl::set_accept_fd(int sd)
                           worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
                           }, true);
 }
+
+void RDMAConnectedSocketImpl::post_chunks_to_rq(int num)
+{
+  post_backlog += num - infiniband->post_chunks_to_rq(num, qp->get_qp());
+}
+
+void RDMAConnectedSocketImpl::update_post_backlog()
+{
+  if (post_backlog)
+    post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp->get_qp());
+}
index 672b876afafb188f70a932eb06cbeb875f24d431..d687ca8c769b845733a44281981624e379eb75e1 100644 (file)
@@ -19,14 +19,14 @@ RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, Inf
     cm_channel = info->cm_channel;
     status = RDMA_ID_CREATED;
     remote_qpn = info->qp_num;
-    worker->center.submit_to(worker->center.get_id(), [this]() {
-      worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
-      status = CHANNEL_FD_CREATED;
-    }, false);
     if (alloc_resource()) {
       close_notify();
       return;
     }
+    worker->center.submit_to(worker->center.get_id(), [this]() {
+      worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
+      status = CHANNEL_FD_CREATED;
+    }, false);
     status = RESOURCE_ALLOCATED;
     local_qpn = qp->get_local_qp_number();
     my_msg.qpn = local_qpn;
@@ -62,8 +62,9 @@ int RDMAIWARPConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, co
 void RDMAIWARPConnectedSocketImpl::close() {
   error = ECONNRESET;
   active = false;
-  if (status >= CONNECTED)
+  if (status >= CONNECTED) {
     rdma_disconnect(cm_id);
+  }
   close_notify();
 }
 
@@ -109,6 +110,7 @@ void RDMAIWARPConnectedSocketImpl::handle_cm_connection() {
       break;
 
     case RDMA_CM_EVENT_ESTABLISHED:
+      ldout(cct, 20) << __func__ << " qp_num=" << cm_id->qp->qp_num << dendl;
       status = CONNECTED;
       if (!is_server) {
         remote_qpn = event->param.conn.qp_num;
@@ -159,6 +161,8 @@ int RDMAIWARPConnectedSocketImpl::alloc_resource() {
   if (!qp) {
     return -1;
   }
+  if (!cct->_conf->ms_async_rdma_support_srq)
+    dispatcher->post_chunks_to_rq(infiniband->get_rx_queue_len(), qp->get_qp());
   dispatcher->register_qp(qp, this);
   dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
   dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
index 5523f2289e422d77f37f484920b44667f1e63392..862131dee66b80de18a75bcac3dbd6cc07f5b17f 100644 (file)
@@ -157,16 +157,17 @@ void RDMADispatcher::handle_async_event()
   }
 }
 
-void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) {
+void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
+{
   Mutex::Locker l(lock);
   get_stack()->get_infiniband().post_chunk_to_pool(chunk);
   perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
-  // handle a case when we have a limited number of
-  // rx buffers and we could not post a required amount when polling
-  if (post_backlog > 0) {
-    ldout(cct, 20) << __func__ << " post_backlog is " << post_backlog << dendl;
-    post_backlog -= get_stack()->get_infiniband().post_chunks_to_srq(post_backlog);
-  }
+}
+
+int RDMADispatcher::post_chunks_to_rq(int num, ibv_qp *qp)
+{
+  Mutex::Locker l(lock);
+  return get_stack()->get_infiniband().post_chunks_to_rq(num, qp);
 }
 
 void RDMADispatcher::polling()
@@ -199,36 +200,33 @@ void RDMADispatcher::polling()
 
       Mutex::Locker l(lock);//make sure connected socket alive when pass wc
 
-      post_backlog += rx_ret - get_stack()->get_infiniband().post_chunks_to_srq(rx_ret);
-
       for (int i = 0; i < rx_ret; ++i) {
         ibv_wc* response = &wc[i];
         Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
-        ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
-
-        assert(wc[i].opcode == IBV_WC_RECV);
 
         if (response->status == IBV_WC_SUCCESS) {
+          assert(wc[i].opcode == IBV_WC_RECV);
           conn = get_conn_lockless(response->qp_num);
           if (!conn) {
             ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
             get_stack()->get_infiniband().post_chunk_to_pool(chunk);
             perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
           } else {
-           polled[conn].push_back(*response);
+            conn->post_chunks_to_rq(1);
+            polled[conn].push_back(*response);
           }
         } else {
           perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
-          perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
-
           ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
               << ") status(" << response->status << ":"
               << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
-          conn = get_conn_lockless(response->qp_num);
-          if (conn && conn->is_connected())
-            conn->fault();
-
+          if (response->status != IBV_WC_WR_FLUSH_ERR) {
+            conn = get_conn_lockless(response->qp_num);
+            if (conn && conn->is_connected())
+              conn->fault();
+          }
           get_stack()->get_infiniband().post_chunk_to_pool(chunk);
+          perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
         }
       }
       for (auto &&i : polled)
@@ -401,16 +399,15 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
         ldout(cct, 1) << __func__ << " send work request returned error for buffer("
                       << response->wr_id << ") status(" << response->status << "): "
                       << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
-      }
-
-      Mutex::Locker l(lock);//make sure connected socket alive when pass wc
-      RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+        Mutex::Locker l(lock);//make sure connected socket alive when pass wc
+        RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
 
-      if (conn && conn->is_connected()) {
-        ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
-        conn->fault();
-      } else {
-        ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
+        if (conn && conn->is_connected()) {
+          ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;
+          conn->fault();
+        } else {
+          ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
+        }
       }
     }
 
index f2965b22e32b0da91e42c4ab2ba348eabfca1f62..e38284d4552a728e70fe9887b648c36d29c605cc 100644 (file)
@@ -47,7 +47,6 @@ class RDMADispatcher {
   bool done = false;
   std::atomic<uint64_t> num_dead_queue_pair = {0};
   std::atomic<uint64_t> num_qp_conn = {0};
-  int post_backlog = 0;
   Mutex lock; // protect `qp_conns`, `dead_queue_pairs`
   // qp_num -> InfRcConnection
   // The main usage of `qp_conns` is looking up connection by qp_num,
@@ -119,8 +118,8 @@ class RDMADispatcher {
 
   std::atomic<uint64_t> inflight = {0};
 
-  void post_chunk_to_pool(Chunk* chunk); 
-
+  void post_chunk_to_pool(Chunk* chunk);
+  int post_chunks_to_rq(int num, ibv_qp *qp=NULL);
 };
 
 class RDMAWorker : public Worker {
@@ -199,6 +198,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
   int tcp_fd = -1;
   bool active;// qp is active ?
   bool pending;
+  int post_backlog = 0;
 
   void notify();
   ssize_t read_buffers(char* buf, size_t len);
@@ -230,6 +230,8 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
   virtual int try_connect(const entity_addr_t&, const SocketOptions &opt);
   bool is_pending() {return pending;}
   void set_pending(bool val) {pending = val;}
+  void post_chunks_to_rq(int num);
+  void update_post_backlog();
 
   class C_handle_connection : public EventCallback {
     RDMAConnectedSocketImpl *csi;