]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: cut the middle-man 7625/head
authorPiotr Dałek <piotr.dalek@ts.fujitsu.com>
Thu, 11 Feb 2016 08:38:46 +0000 (09:38 +0100)
committerPiotr Dałek <piotr.dalek@ts.fujitsu.com>
Tue, 23 Feb 2016 14:52:42 +0000 (15:52 +0100)
Get rid of complete_bl and let messenger write directly to outcoming_bl of
a connection. Also, if message bufferlist is small enough, append its
contents to outcoming_bl directly, so we'll use less iovecs and in best
case, pack entire message (together with header and footer added in
write_message()) in single bufferptr.

Signed-off-by: Piotr Dałek <piotr.dalek@ts.fujitsu.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index aafe894552ce81077a596a1fd0517613f1542b4f..cd913c9df3704af9ec3443832cfa47322764e63d 100644 (file)
@@ -45,6 +45,7 @@ ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
 // 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead
 
 const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
+const int ASYNC_COALESCE_THRESHOLD = 256;
 
 class C_time_wakeup : public EventCallback {
   AsyncConnectionRef conn;
@@ -344,13 +345,8 @@ ssize_t AsyncConnection::do_sendmsg(struct msghdr &msg, unsigned len, bool more)
 
 // return the remaining bytes, it may larger than the length of ptr
 // else return < 0 means error
-ssize_t AsyncConnection::_try_send(bufferlist &send_bl, bool send, bool more)
+ssize_t AsyncConnection::_try_send(bool send, bool more)
 {
-  ldout(async_msgr->cct, 20) << __func__ << " send bl length is " << send_bl.length() << dendl;
-  if (send_bl.length()) {
-    outcoming_bl.claim_append(send_bl);
-  }
-
   if (!send)
     return 0;
 
@@ -2338,14 +2334,13 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
                                  << "): sig = " << footer.sig << dendl;
     }
   }
+  
+  unsigned original_bl_len = outcoming_bl.length();
 
-  bufferlist complete_bl;
-  // send tag
-  char tag = CEPH_MSGR_TAG_MSG;
-  complete_bl.append(&tag, sizeof(tag));
+  outcoming_bl.append(CEPH_MSGR_TAG_MSG);
 
   if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
-    complete_bl.append((char*)&header, sizeof(header));
+    outcoming_bl.append((char*)&header, sizeof(header));
   } else {
     ceph_msg_header_old oldheader;
     memcpy(&oldheader, &header, sizeof(header));
@@ -2355,7 +2350,7 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
     oldheader.reserved = header.reserved;
     oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
                                 sizeof(oldheader) - sizeof(oldheader.crc));
-    complete_bl.append((char*)&oldheader, sizeof(oldheader));
+    outcoming_bl.append((char*)&oldheader, sizeof(oldheader));
   }
 
   ldout(async_msgr->cct, 20) << __func__ << " sending message type=" << header.type
@@ -2364,12 +2359,19 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
                              << " data=" << header.data_len
                              << " off " << header.data_off << dendl;
 
-  complete_bl.claim_append(bl);
+  if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
+    std::list<buffer::ptr>::const_iterator pb;
+    for (pb = bl.buffers().begin(); pb != bl.buffers().end(); ++pb) {
+      outcoming_bl.append((char*)pb->c_str(), pb->length());
+    }
+  } else {
+    outcoming_bl.claim_append(bl);  
+  }
 
   // send footer; if receiver doesn't support signatures, use the old footer format
   ceph_msg_footer_old old_footer;
   if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
-    complete_bl.append((char*)&footer, sizeof(footer));
+    outcoming_bl.append((char*)&footer, sizeof(footer));
   } else {
     if (msgr->crcflags & MSG_CRC_HEADER) {
       old_footer.front_crc = footer.front_crc;
@@ -2380,13 +2382,13 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
     }
     old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
     old_footer.flags = footer.flags;
-    complete_bl.append((char*)&old_footer, sizeof(old_footer));
+    outcoming_bl.append((char*)&old_footer, sizeof(old_footer));
   }
 
-  logger->inc(l_msgr_send_bytes, complete_bl.length());
+  logger->inc(l_msgr_send_bytes, outcoming_bl.length() - original_bl_len);
   ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
                              << " " << m << dendl;
-  ssize_t rc = _try_send(complete_bl, true, more);
+  ssize_t rc = _try_send(true, more);
   if (rc < 0) {
     ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
                               << cpp_strerror(errno) << dendl;
@@ -2435,7 +2437,6 @@ void AsyncConnection::mark_down()
 void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
 {
   assert(write_lock.is_locked());
-  bufferlist bl;
 
   utime_t t = ceph_clock_now(async_msgr->cct);
   struct ceph_timespec ts;
@@ -2443,25 +2444,24 @@ void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
   if (ack) {
     assert(tp);
     tp->encode_timeval(&ts);
-    bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
-    bl.append((char*)&ts, sizeof(ts));
+    outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
+    outcoming_bl.append((char*)&ts, sizeof(ts));
   } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
     struct ceph_timespec ts;
     t.encode_timeval(&ts);
-    bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
-    bl.append((char*)&ts, sizeof(ts));
+    outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
+    outcoming_bl.append((char*)&ts, sizeof(ts));
   } else {
-    bl.append(CEPH_MSGR_TAG_KEEPALIVE);
+    outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
   }
 
   ldout(async_msgr->cct, 10) << __func__ << " try send keepalive or ack" << dendl;
-  _try_send(bl, false);
+  _try_send(false);
 }
 
 void AsyncConnection::handle_write()
 {
   ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
-  bufferlist bl;
   ssize_t r = 0;
 
   write_lock.Lock();
@@ -2495,14 +2495,14 @@ void AsyncConnection::handle_write()
     if (left) {
       ceph_le64 s;
       s = in_seq.read();
-      bl.append(CEPH_MSGR_TAG_ACK);
-      bl.append((char*)&s, sizeof(s));
+      outcoming_bl.append(CEPH_MSGR_TAG_ACK);
+      outcoming_bl.append((char*)&s, sizeof(s));
       ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
       ack_left.sub(left);
       left = ack_left.read();
-      r = _try_send(bl, true, left);
+      r = _try_send(true, left);
     } else if (is_queued()) {
-      r = _try_send(bl);
+      r = _try_send();
     }
 
     write_lock.Unlock();
@@ -2519,7 +2519,7 @@ void AsyncConnection::handle_write()
                                  << " policy.server is false" << dendl;
       _connect();
     } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
-      r = _try_send(bl);
+      r = _try_send();
       if (r < 0) {
         ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
         write_lock.Unlock();
index e3d3d5901a5f95115f0dee70ae1f5d7b6d631fd1..4a76a7c1b24113cf13ae57a974a6057d74d5b5b1 100644 (file)
@@ -53,11 +53,12 @@ class AsyncConnection : public Connection {
   ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more);
   ssize_t try_send(bufferlist &bl, bool send=true, bool more=false) {
     Mutex::Locker l(write_lock);
-    return _try_send(bl, send, more);
+    outcoming_bl.claim_append(bl);
+    return _try_send(send, more);
   }
   // if "send" is false, it will only append bl to send buffer
   // the main usage is avoid error happen outside messenger threads
-  ssize_t _try_send(bufferlist &bl, bool send=true, bool more=false);
+  ssize_t _try_send(bool send=true, bool more=false);
   ssize_t _send(Message *m);
   void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
   ssize_t read_until(unsigned needed, char *p);