]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Reduce lock spam in XioPortal SubmitQueue.
authorMatt Benjamin <matt@cohortfs.com>
Tue, 16 Dec 2014 16:38:06 +0000 (11:38 -0500)
committerMatt Benjamin <matt@cohortfs.com>
Wed, 14 Jan 2015 21:44:17 +0000 (16:44 -0500)
In SubmitQueue::deq, scan up to nlanes, exit immediately with
found work, recalling last lane.

Yield to Accelio iff a full scan finds no work.

xio: avoid starving run loop, don't stop loop in shutdown()
     Fix 2 issues flagged in review by Alex Rosenbaum.
     * In the XioPortal main loop, the recent reduce lock contention change
       also made easier to starve Accelio under steady send work.  Restore
       the original behavior.
     * Remove the call to xio_context_stop_loop() in XioPortal::shutdown(),
       to ensure that Accelio can finish cleaning up.

Move queue guard check.
Release der spinlock.

Signed-off-by: Matt Benjamin <matt@cohortfs.com>
src/msg/xio/XioPortal.h

index 5932d56dc08758a5f431b6d47005f4395eee39d6..efeaad7df881d72b449e2cc727c8ad262cfd5f81 100644 (file)
@@ -51,7 +51,9 @@ private:
 
     Lane qlane[nlanes];
 
-    SubmitQueue()
+    int ix; /* atomicity by portal thread */
+
+    SubmitQueue() : ix(0)
       {
        int ix;
        Lane* lane;
@@ -90,22 +92,24 @@ private:
 
     void deq(XioSubmit::Queue& send_q)
       {
-       int ix;
        Lane* lane;
-
-       for (ix = 0; ix < nlanes; ++ix) {
+       int cnt;
+       for (cnt = 0; cnt < nlanes; ++cnt, ++ix, ix = ix % nlanes) {
          lane = &qlane[ix];
          pthread_spin_lock(&lane->sp);
          if (lane->size > 0) {
            XioSubmit::Queue::const_iterator i1 = send_q.end();
            send_q.splice(i1, lane->q);
            lane->size = 0;
+           ++ix, ix = ix % nlanes;
+           pthread_spin_unlock(&lane->sp);
+           break;
          }
          pthread_spin_unlock(&lane->sp);
        }
       }
 
-  };
+  }; /* SubmitQueue */
 
   Messenger *msgr;
   struct xio_context *ctx;
@@ -252,23 +256,23 @@ public:
          while (q_iter != send_q.end()) {
            xs = &(*q_iter);
            xcon = xs->xcon;
-           xmsg = static_cast<XioMsg*>(xs);
-
-           /* guard Accelio send queue */
-           xio_qdepth_high = xcon->xio_qdepth_high_mark();
-           if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) >
-                        xio_qdepth_high)) {
-             requeue_all_xcon(xmsg, xcon, q_iter, send_q);
-             goto restart;
-           }
-
-           q_iter = send_q.erase(q_iter);
 
            switch (xs->type) {
            case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
+             xmsg = static_cast<XioMsg*>(xs);
              if (unlikely(!xs->xcon->conn))
                code = ENOTCONN;
              else {
+                /* XXX guard Accelio send queue (should be safe to rely
+                 * on Accelio's check on below, but this assures that
+                 * all chained xio_msg are accounted) */
+                xio_qdepth_high = xcon->xio_qdepth_high_mark();
+                if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) >
+                              xio_qdepth_high)) {
+                  requeue_all_xcon(xmsg, xcon, q_iter, send_q);
+                  goto restart;
+                }
+
                msg = &xmsg->req_0.msg;
                code = xio_send_msg(xcon->conn, msg);
                /* header trace moved here to capture xio serial# */
@@ -276,7 +280,7 @@ public:
                  print_xio_msg_hdr(msgr->cct, "xio_send_msg", xmsg->hdr, msg);
                  print_ceph_msg(msgr->cct, "xio_send_msg", xmsg->m);
                }
-             }
+             } /* !ENOTCONN */
              if (unlikely(code)) {
                switch (code) {
                case XIO_E_TX_QUEUE_OVERFLOW:
@@ -296,11 +300,13 @@ public:
              break;
            default:
              /* INCOMING_MSG_RELEASE */
+              q_iter = send_q.erase(q_iter);
              release_xio_rsp(static_cast<XioRsp*>(xs));
-             break;
-           }
-         }
-       }
+             continue;
+           } /* switch (xs->type) */
+            q_iter = send_q.erase(q_iter);
+         } /* while */
+       } /* size > 0 */
 
        pthread_spin_unlock(&sp);
        xio_context_run_loop(ctx, 300);
@@ -318,7 +324,6 @@ public:
   void shutdown()
     {
        pthread_spin_lock(&sp);
-       xio_context_stop_loop(ctx);
        _shutdown = true;
        pthread_spin_unlock(&sp);
     }