]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: use special Beacon to detect SQ WRs drained
authorRoman Penyaev <rpenyaev@suse.de>
Tue, 27 Aug 2019 08:44:27 +0000 (16:44 +0800)
committerChangcheng Liu <changcheng.liu@aliyun.com>
Mon, 16 Sep 2019 07:25:01 +0000 (15:25 +0800)
switch the QueuePair to error state, then post Beacon WR to
send queue. All outstanding WQEs will be flushed to CQ.
In CQ, check the completion queue element to detect SQ WRs has
been drained before destroying the QueuePair.
We don't post another Beacon WR to RQ if SRQ is not used/supported,
the reason is that QueuePair could be destroyed only under all
flushed WRs have been polled from CQ.

Refer to page 474 on below spec:
 InfiniBandTM Architecture Specification Volume 1, Release 1.3
link: https://cw.infinibandta.org/document/dl/7859
Signed-off-by: Roman Penyaev <rpenyaev@suse.de>
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index ca3b646cd1c2d19afd9f53631d393f3f05294b03..eaa0ad3137c515d7db7b5a84405233ee5ce2c8d8 100644 (file)
@@ -171,7 +171,8 @@ Infiniband::QueuePair::QueuePair(
   txcq(txcq),
   rxcq(rxcq),
   initial_psn(lrand48() & PSN_MSK),
-  max_send_wr(tx_queue_len),
+  // One extra WR for beacon
+  max_send_wr(tx_queue_len + 1),
   max_recv_wr(rx_queue_len),
   q_key(q_key),
   dead(false)
@@ -447,14 +448,9 @@ retry:
 }
 
 /**
- * Change RC QueuePair into the ERROR state. This is necessary modify
- * the Queue Pair into the Error state and poll all of the relevant
- * Work Completions prior to destroying a Queue Pair.
- * Since destroying a Queue Pair does not guarantee that its Work
- * Completions are removed from the CQ upon destruction. Even if the
- * Work Completions are already in the CQ, it might not be possible to
- * retrieve them. If the Queue Pair is associated with an SRQ, it is
- * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED
+ * Switch QP to ERROR state and then post a beacon to be able to drain all
+ * WCEs and then safely destroy QP. See RDMADispatcher::handle_tx_event()
+ * for details.
  *
  * \return
  *      -errno if the QueuePair can't switch to ERROR
@@ -464,19 +460,26 @@ int Infiniband::QueuePair::to_dead()
 {
   if (dead)
     return 0;
-  ibv_qp_attr qpa;
-  memset(&qpa, 0, sizeof(qpa));
-  qpa.qp_state = IBV_QPS_ERR;
 
-  int mask = IBV_QP_STATE;
-  int ret = ibv_modify_qp(qp, &qpa, mask);
-  if (ret) {
-    lderr(cct) << __func__ << " failed to transition to ERROR state: "
-               << cpp_strerror(errno) << dendl;
+  if (modify_qp_to_error()) {
+    return -1;
+  }
+  ldout(cct, 20) << __func__ << " force trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn
+                 << " bound remote QueuePair, qp number: " << local_cm_meta.peer_qpn << dendl;
+
+  struct ibv_send_wr *bad_wr = nullptr, beacon;
+  memset(&beacon, 0, sizeof(beacon));
+  beacon.wr_id = BEACON_WRID;
+  beacon.opcode = IBV_WR_SEND;
+  beacon.send_flags = IBV_SEND_SIGNALED;
+  if (ibv_post_send(qp, &beacon, &bad_wr)) {
+    lderr(cct) << __func__ << " failed to send a beacon: " << cpp_strerror(errno) << dendl;
     return -errno;
   }
+  ldout(cct, 20) << __func__ << " trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn << " Beacon sent " << dendl;
   dead = true;
-  return ret;
+
+  return 0;
 }
 
 int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const
@@ -1064,7 +1067,8 @@ void Infiniband::init()
     ceph_abort();
   }
 
-  tx_queue_len = device->device_attr.max_qp_wr;
+  // Keep extra one WR for a beacon to indicate all WCEs were consumed
+  tx_queue_len = device->device_attr.max_qp_wr - 1;
   if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) {
     tx_queue_len = cct->_conf->ms_async_rdma_send_buffers;
     ldout(cct, 1) << __func__ << " assigning: " << tx_queue_len << " send buffers"  << dendl;
index 57d5d98f781dcbc6ce28be0898b50be2cd09e37c..2c21077d2e7b1d576c12662d1763991b5ea98cca 100644 (file)
@@ -44,6 +44,8 @@
 #define PSN_LEN 24
 #define PSN_MSK ((1 << PSN_LEN) - 1)
 
+#define BEACON_WRID 0xDEADBEEF
+
 struct ib_cm_meta_t {
   uint16_t lid;
   uint32_t local_qpn;
index 9026b13dc8c0578a7095a0166cbdf133b41664d8..0112f8f3a886eb80fc60befa8de6329042ed0108 100644 (file)
@@ -42,7 +42,7 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
   ldout(cct, 20) << __func__ << " destruct." << dendl;
   cleanup();
   worker->remove_pending_conn(this);
-  dispatcher->erase_qpn(local_qpn);
+  dispatcher->schedule_qp_destroy(local_qpn);
 
   for (unsigned i=0; i < wc.size(); ++i) {
     dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
index f761e1fbff9fed6426f5a698a3658ac6890570ec..581eabf7e966df3d0b7fab87c98c4587fb9df0ed 100644 (file)
@@ -38,7 +38,6 @@ RDMADispatcher::~RDMADispatcher()
   ceph_assert(qp_conns.empty());
   ceph_assert(num_qp_conn == 0);
   ceph_assert(dead_queue_pairs.empty());
-  ceph_assert(num_dead_queue_pair == 0);
 
   delete async_handler;
 }
@@ -160,7 +159,7 @@ void RDMADispatcher::handle_async_event()
             ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
             conn->fault();
             if (!cct->_conf->ms_async_rdma_cm)
-              erase_qpn_lockless(qpn);
+              enqueue_dead_qp(qpn);
           }
         }
         break;
@@ -273,30 +272,31 @@ void RDMADispatcher::polling()
     }
 
     if (!tx_ret && !rx_ret) {
-      // NOTE: Has TX just transitioned to idle? We should do it when idle!
-      // It's now safe to delete queue pairs (see comment by declaration
-      // for dead_queue_pairs).
-      // Additionally, don't delete qp while outstanding_buffers isn't empty,
-      // because we need to check qp's state before sending
       perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
-      if (num_dead_queue_pair) {
-       std::lock_guard l{lock}; // FIXME reuse dead qp because creating one qp costs 1 ms
-        auto it = dead_queue_pairs.begin();
-        while (it != dead_queue_pairs.end()) {
-          auto i = *it;
-          // Bypass QPs that do not collect all Tx completions yet.
-          if (i->get_tx_wr()) {
-            ldout(cct, 20) << __func__ << " bypass qp=" << i << " tx_wr=" << i->get_tx_wr() << dendl;
-            ++it;
-          } else {
-            ldout(cct, 10) << __func__ << " finally delete qp=" << i << dendl;
-            delete i;
-            it = dead_queue_pairs.erase(it);
-            perf_logger->dec(l_msgr_rdma_active_queue_pair);
-            --num_dead_queue_pair;
-          }
+      //
+      // Clean up dead QPs when rx/tx CQs are in idle. The thing is that
+      // we can destroy QPs even earlier, just when beacon has been received,
+      // but we have two CQs (rx & tx), thus beacon WC can be poped from tx
+      // CQ before other WCs are fully consumed from rx CQ. For safety, we
+      // wait for beacon and then "no-events" from CQs.
+      //
+      // Calling size() on vector without locks is totally fine, since we
+      // use it as a hint (accuracy is not important here)
+      //
+      if (!dead_queue_pairs.empty()) {
+        decltype(dead_queue_pairs) dead_qps;
+        {
+          std::lock_guard l{lock};
+          dead_queue_pairs.swap(dead_qps);
+        }
+
+        for (auto& qp: dead_qps) {
+          perf_logger->dec(l_msgr_rdma_active_queue_pair);
+          ldout(cct, 10) << __func__ << " finally delete qp = " << qp << dendl;
+          delete qp;
         }
       }
+
       if (!num_qp_conn && done && dead_queue_pairs.empty())
         break;
 
@@ -396,21 +396,44 @@ Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
   return get_qp_lockless(qp);
 }
 
-void RDMADispatcher::erase_qpn_lockless(uint32_t qpn)
+void RDMADispatcher::enqueue_dead_qp(uint32_t qpn)
 {
+  std::lock_guard l{lock};
   auto it = qp_conns.find(qpn);
-  if (it == qp_conns.end())
+  if (it == qp_conns.end()) {
+    lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
     return ;
-  ++num_dead_queue_pair;
-  dead_queue_pairs.push_back(it->second.first);
+  }
+  QueuePair *qp = it->second.first;
+  dead_queue_pairs.push_back(qp);
   qp_conns.erase(it);
   --num_qp_conn;
 }
 
-void RDMADispatcher::erase_qpn(uint32_t qpn)
+void RDMADispatcher::schedule_qp_destroy(uint32_t qpn)
 {
   std::lock_guard l{lock};
-  erase_qpn_lockless(qpn);
+  auto it = qp_conns.find(qpn);
+  if (it == qp_conns.end()) {
+    lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
+    return;
+  }
+  QueuePair *qp = it->second.first;
+  if (qp->to_dead()) {
+    //
+    // Failed to switch to dead. This is abnormal, but we can't
+    // do anything, so just destroy QP.
+    //
+    dead_queue_pairs.push_back(qp);
+    qp_conns.erase(it);
+    --num_qp_conn;
+  } else {
+    //
+    // Successfully switched to dead, thus keep entry in the map.
+    // But only zero out socked pointer in order to return null from
+    // get_conn_lockless();
+    it->second.second = nullptr;
+  }
 }
 
 void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
@@ -420,6 +443,13 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
   for (int i = 0; i < n; ++i) {
     ibv_wc* response = &cqe[i];
     Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+
+    // If it's beacon WR, enqueue the QP to be destroyed later
+    if (response->wr_id == BEACON_WRID) {
+      enqueue_dead_qp(response->qp_num);
+      continue;
+    }
+
     ldout(cct, 25) << __func__ << " QP: " << response->qp_num
                    << " len: " << response->byte_len << " , addr:" << chunk
                    << " " << ib->wc_status_to_string(response->status) << dendl;
index 8cd8e194dd420b1f3a4e2ba7f7496fa4b78682ec..5522e8a363b4b2d1aaaf1ac62addf09680f12bed 100644 (file)
@@ -46,7 +46,6 @@ class RDMADispatcher {
   Infiniband::CompletionChannel *tx_cc = nullptr, *rx_cc = nullptr;
   EventCallbackRef async_handler;
   bool done = false;
-  std::atomic<uint64_t> num_dead_queue_pair = {0};
   std::atomic<uint64_t> num_qp_conn = {0};
   // protect `qp_conns`, `dead_queue_pairs`
   ceph::mutex lock = ceph::make_mutex("RDMADispatcher::lock");
@@ -57,9 +56,9 @@ class RDMADispatcher {
   /**
    * 1. Connection call mark_down
    * 2. Move the Queue Pair into the Error state(QueuePair::to_dead)
-   * 3. Wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED(handle_async_event)
-   * 4. Wait for CQ to be empty(handle_tx_event)
-   * 5. Destroy the QP by calling ibv_destroy_qp()(handle_tx_event)
+   * 3. Post a beacon
+   * 4. Wait for beacon which indicates queues are drained
+   * 5. Destroy the QP by calling ibv_destroy_qp()
    *
    * @param qp The qp needed to dead
    */
@@ -78,6 +77,7 @@ class RDMADispatcher {
     ceph::make_mutex("RDMADispatcher::for worker pending list");
   // fixme: lockfree
   std::list<RDMAWorker*> pending_workers;
+  void enqueue_dead_qp(uint32_t qp);
 
   class C_handle_cq_async : public EventCallback {
     RDMADispatcher *dispatcher;
@@ -111,8 +111,7 @@ class RDMADispatcher {
   RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
   QueuePair* get_qp_lockless(uint32_t qp);
   QueuePair* get_qp(uint32_t qp);
-  void erase_qpn_lockless(uint32_t qpn);
-  void erase_qpn(uint32_t qpn);
+  void schedule_qp_destroy(uint32_t qp);
   Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
   Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
   void notify_pending_workers();