]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
xio: Enable Accelio flow control with msgs and bytes throttlers
authorVu Pham <vu@mellanox.com>
Thu, 11 Dec 2014 14:28:26 +0000 (06:28 -0800)
committerMatt Benjamin <matt@cohortfs.com>
Wed, 14 Jan 2015 21:44:00 +0000 (16:44 -0500)
* Enable Accelio flow control in general
* Read out policy for messages and bytes throttlers from connection's peer_type
* Set Accelio connection flow control with policy throttlers or default values
* Set q_high_mark for xio_connection (80% of queue_depth)
  xio: Correct q_high_mark setting

Signed-off-by: Vu Pham <vu@mellanox.com>
Signed-off-by: Matt Benjamin <matt@cohortfs.com>
src/msg/xio/XioConnection.cc
src/msg/xio/XioConnection.h
src/msg/xio/XioMessenger.cc
src/msg/xio/XioPortal.h

index 207c8ee2d2f20c2954a4c6ce659b5e8c212f1426..b0a0ce5252625ef4c53c53aa4b4e5ef27011cbc5 100644 (file)
@@ -97,6 +97,49 @@ XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type,
   peer_type = peer.name.type();
   set_peer_addr(peer.addr);
 
+  Messenger::Policy policy;
+  int64_t max_msgs = 0, max_bytes = 0, bytes_opt = 0;
+  int xopt;
+
+  policy = m->get_policy(peer_type);
+
+  if (policy.throttler_messages) {
+    max_msgs = policy.throttler_messages->get_max();
+    ldout(m->cct,0) << "XioMessenger throttle_msgs: " << max_msgs << dendl;
+  }
+
+  xopt = m->cct->_conf->xio_queue_depth;
+  if (max_msgs > xopt)
+    xopt = max_msgs;
+
+  /* set high mark for send, reserved 20% for credits */
+  q_high_mark = xopt * 4 / 5;
+  q_low_mark = q_high_mark/2;
+
+  /* set send & receive msgs queue depth */
+  xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_MSGS,
+             &xopt, sizeof(xopt));
+  xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_MSGS,
+             &xopt, sizeof(xopt));
+
+  if (policy.throttler_bytes) {
+    max_bytes = policy.throttler_bytes->get_max();
+    ldout(m->cct,0) << "XioMessenger throttle_bytes: " << max_bytes << dendl;
+  }
+
+  bytes_opt = (2 << 28); /* default: 512 MB */
+  if (max_bytes > bytes_opt)
+    bytes_opt = max_bytes;
+
+  /* set send & receive total bytes throttle */
+  xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_BYTES,
+             &bytes_opt, sizeof(bytes_opt));
+  xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_BYTES,
+             &bytes_opt, sizeof(bytes_opt));
+
+  ldout(m->cct,0) << "Peer type: " << peer.name.type_str() <<
+        " throttle_msgs: " << xopt << " throttle_bytes: " << bytes_opt << dendl;
+
   /* XXXX fake features, aieee! */
   set_features(XIO_ALL_FEATURES);
 }
index 28f63c9e24df89b6bd70191863ade38eca966219..2cd9d3499dbe44a2d6cb8f882de31c9d6b61d72d 100644 (file)
@@ -55,6 +55,7 @@ private:
   uint32_t special_handling;
   uint64_t scount;
   uint32_t send_ctr;
+  int q_high_mark;
 
   struct lifecycle {
     // different from Pipe states?
@@ -139,8 +140,8 @@ private:
     return 0;
   }
 
-  int xio_queue_depth() {
-    return msgr->cct->_conf->xio_queue_depth;
+  int xio_qdepth_high_mark() {
+    return q_high_mark;
   }
 
 public:
index 04fa4b21ad3c58ca6f820eba7cba79c15e89e704..f9f5799c0eccfbba8a38b23727d1805876e2a9f3 100644 (file)
@@ -289,11 +289,10 @@ XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN,
                  &xopt, sizeof(xopt));
 
-      xopt = cct->_conf->xio_queue_depth; // defaults to 512
-      xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_MSGS,
-                 &xopt, sizeof(xopt));
-      xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_MSGS,
-                 &xopt, sizeof(xopt));
+      /* enable flow-control */
+      xopt = 1;
+      xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_ENABLE_FLOW_CONTROL,
+                 &xopt, sizeof(xopt));
 
       /* and set threshold for buffer callouts */
       xopt = 16384;
index 7e8ff16e9afb51d2dbab4173dee08209764acecb..0090e6227e213d06629c4c81949a65694bcc4e73 100644 (file)
@@ -181,7 +181,7 @@ public:
   void *entry()
     {
       int size, code = 0;
-      uint32_t xio_qdepth;
+      uint32_t xio_qdepth_high;
       XioSubmit::Queue send_q;
       XioSubmit::Queue::iterator q_iter;
       struct xio_msg *msg = NULL;
@@ -208,8 +208,8 @@ public:
             xmsg = static_cast<XioMsg*>(xs);
 
             /* guard Accelio send queue */
-            xio_qdepth = xcon->xio_queue_depth();
-            if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) > xio_qdepth)) {
+            xio_qdepth_high = xcon->xio_qdepth_high_mark();
+            if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) > xio_qdepth_high)) {
               ++q_iter;
               continue;
             }