]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Using buffer read to avoid small read overhead
authorHaomai Wang <haomaiwang@gmail.com>
Tue, 9 Dec 2014 05:16:52 +0000 (13:16 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 15 Jan 2015 19:07:09 +0000 (03:07 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index b071d79abc00d224451de9581c4e7f09af7843b6..bdb1296f4d3b108c5a7d34c6a859bbef871eaac8 100644 (file)
@@ -39,6 +39,8 @@ ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
         << ").";
 }
 
+const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
+
 class C_time_wakeup : public EventCallback {
   AsyncConnectionRef conn;
 
@@ -185,10 +187,11 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c)
   : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), peer_global_seq(0),
     out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0), sd(-1),
-    port(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false),
-    stop_lock("AsyncConnection::stop_lock"),
-    got_bad_auth(false), authorizer(NULL), replacing(false), stopping(0),
-    state_buffer(4096), state_offset(0), net(cct), center(c)
+    port(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false), recv_buf(NULL),
+    recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
+    recv_start(0), recv_end(0), stop_lock("AsyncConnection::stop_lock"),
+    got_bad_auth(false), authorizer(NULL), replacing(false),
+    state_buffer(NULL), state_offset(0), net(cct), center(c)
 {
   read_handler.reset(new C_handle_read(this));
   write_handler.reset(new C_handle_write(this));
@@ -200,11 +203,18 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente
   accept_handler.reset(new C_deliver_accept(async_msgr, this));
   local_deliver_handler.reset(new C_local_deliver(this));
   memset(msgvec, 0, sizeof(msgvec));
+  // double recv_max_prefetch see "read_until"
+  recv_buf = new char[2*recv_max_prefetch];
+  state_buffer = new char[4096];
 }
 
 AsyncConnection::~AsyncConnection()
 {
   assert(!authorizer);
+  if (recv_buf)
+    delete recv_buf;
+  if (state_buffer)
+    delete state_buffer;
 }
 
 /* return -1 means `fd` occurs error or closed, it should be closed
@@ -364,31 +374,77 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send)
 // the needed buffer, so the passed in bufferptr must be the same.
 // Normally, only "read_message" will pass existing bufferptr in
 //
+// And it will uses readahead method to reduce small read overhead,
+// "recv_buf" is used to store read buffer
+//
 // return the remaining bytes, 0 means this buffer is finished
 // else return < 0 means error
-int AsyncConnection::read_until(uint64_t needed, bufferptr &p)
+int AsyncConnection::read_until(uint64_t len, char *p)
 {
-  assert(needed);
-  int offset = state_offset;
-  int left = needed - offset;
-  int r;
-  do {
-    r = read_bulk(sd, p.c_str()+offset, left);
-    if (r < 0) {
-      ldout(async_msgr->cct, 1) << __func__ << " read failed, state is " << get_state_name(state) << dendl;
-      return -1;
-    } else if (r == left) {
-      state_offset = 0;
+  assert(len);
+  ldout(async_msgr->cct, 20) << __func__ << " len is " << len << " state_offset is "
+                             << state_offset << dendl;
+  int r = 0;
+  uint64_t left = len - state_offset;
+  if (recv_end > recv_start) {
+    assert(state_offset == 0);
+    uint64_t to_read = MIN(recv_end - recv_start, left);
+    memcpy(p, recv_buf+recv_start, to_read);
+    recv_start += to_read;
+    left -= to_read;
+    ldout(async_msgr->cct, 20) << __func__ << " got " << to_read << " in buffer "
+                               << " left is " << left << " buffer still has "
+                               << recv_end - recv_start << dendl;
+    if (left == 0) {
       return 0;
     }
-    left -= r;
-    offset += r;
-  } while (r > 0);
-
-  state_offset = offset;
-  ldout(async_msgr->cct, 20) << __func__ << " read " << r << " bytes, state is "
-                      << get_state_name(state) << dendl;
-  return needed - offset;
+    state_offset += to_read;
+  }
+
+  assert(recv_end == recv_start);
+  recv_end = recv_start = 0;
+  /* nothing left in the prefetch buffer */
+  if (len > recv_max_prefetch) {
+    /* this was a large read, we don't prefetch for these */
+    do {
+      r = read_bulk(sd, p+state_offset, left);
+      ldout(async_msgr->cct, 20) << __func__ << " read_bulk left is " << left << " got " << r << dendl;
+      if (r < 0) {
+        ldout(async_msgr->cct, 1) << __func__ << " read failed, state is " << get_state_name(state) << dendl;
+        return -1;
+      } else if (r == static_cast<int>(left)) {
+        state_offset = 0;
+        return 0;
+      }
+      state_offset += r;
+      left -= r;
+    } while (r > 0);
+  } else {
+    do {
+      r = read_bulk(sd, recv_buf+recv_end, recv_max_prefetch);
+      ldout(async_msgr->cct, 20) << __func__ << " read_bulk recv_end is " << recv_end
+                                 << " left is " << left << " got " << r << dendl;
+      if (r < 0) {
+        ldout(async_msgr->cct, 1) << __func__ << " read failed, state is " << get_state_name(state) << dendl;
+        return -1;
+      }
+      recv_end += r;
+      if (r >= static_cast<int>(left)) {
+        recv_start = len - state_offset;
+        memcpy(p+state_offset, recv_buf, recv_start);
+        state_offset = 0;
+        return 0;
+      }
+      left -= r;
+    } while (r > 0);
+    memcpy(p+state_offset, recv_buf, recv_end-recv_start);
+    state_offset += (recv_end - recv_start);
+    recv_end = recv_start = 0;
+  }
+  ldout(async_msgr->cct, 20) << __func__ << " need len " << len << " remaining "
+                             << len - state_offset << " bytes, state is "
+                             << get_state_name(state) << dendl;
+  return len - state_offset;
 }
 
 void AsyncConnection::process()
@@ -404,15 +460,14 @@ void AsyncConnection::process()
       case STATE_OPEN:
         {
           char tag = -1;
-          r = read_bulk(sd, &tag, sizeof(tag));
+          r = read_until(sizeof(tag), &tag);
           if (r < 0) {
             ldout(async_msgr->cct, 1) << __func__ << " read tag failed, state is "
                                       << get_state_name(state) << dendl;
             goto fail;
-          } else if (r == 0) {
+          } else if (r > 0) {
             break;
           }
-          assert(r == 1);
 
           if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
             ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl;
@@ -446,7 +501,7 @@ void AsyncConnection::process()
           }
 
           ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
-          t = (ceph_timespec*)(state_buffer.c_str());
+          t = (ceph_timespec*)state_buffer;
           utime_t kp_t = utime_t(*t);
           _send_keepalive_or_ack(true, &kp_t);
           ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
@@ -465,7 +520,7 @@ void AsyncConnection::process()
             break;
           }
 
-          t = (ceph_timespec*)(state_buffer.c_str());
+          t = (ceph_timespec*)state_buffer;
           last_keepalive_ack = utime_t(*t);
           ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
           state = STATE_OPEN;
@@ -483,7 +538,7 @@ void AsyncConnection::process()
             break;
           }
 
-          seq = (ceph_le64*)(state_buffer.c_str());
+          seq = (ceph_le64*)state_buffer;
           ldout(async_msgr->cct, 20) << __func__ << " got ACK" << dendl;
           handle_ack(*seq);
           state = STATE_OPEN;
@@ -513,11 +568,11 @@ void AsyncConnection::process()
           ldout(async_msgr->cct, 20) << __func__ << " got MSG header" << dendl;
 
           if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
-            header = *((ceph_msg_header*)state_buffer.c_str());
+            header = *((ceph_msg_header*)state_buffer);
             header_crc = ceph_crc32c(0, (unsigned char *)&header,
                                     sizeof(header) - sizeof(header.crc));
           } else {
-            oldheader = *((ceph_msg_header_old*)state_buffer.c_str());
+            oldheader = *((ceph_msg_header_old*)state_buffer);
             // this is fugly
             memcpy(&header, &oldheader, sizeof(header));
             header.src = oldheader.src.name;
@@ -588,7 +643,7 @@ void AsyncConnection::process()
           int front_len = current_header.front_len;
           if (front_len) {
             bufferptr ptr = buffer::create(front_len);
-            r = read_until(front_len, ptr);
+            r = read_until(front_len, ptr.c_str());
             if (r < 0) {
               ldout(async_msgr->cct, 1) << __func__ << " read message front failed" << dendl;
               goto fail;
@@ -609,7 +664,7 @@ void AsyncConnection::process()
           int middle_len = current_header.middle_len;
           if (middle_len) {
             bufferptr ptr = buffer::create(middle_len);
-            r = read_until(middle_len, ptr);
+            r = read_until(middle_len, ptr.c_str());
             if (r < 0) {
               ldout(async_msgr->cct, 1) << __func__ << " read message middle failed" << dendl;
               goto fail;
@@ -658,7 +713,7 @@ void AsyncConnection::process()
           while (msg_left > 0) {
             bufferptr bp = data_blp.get_current_ptr();
             uint64_t read = MIN(bp.length(), msg_left);
-            r = read_until(read, bp);
+            r = read_until(read, bp.c_str());
             if (r < 0) {
               ldout(async_msgr->cct, 1) << __func__ << " read data error " << dendl;
               goto fail;
@@ -697,9 +752,9 @@ void AsyncConnection::process()
           }
 
           if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
-            footer = *((ceph_msg_footer*)state_buffer.c_str());
+            footer = *((ceph_msg_footer*)state_buffer);
           } else {
-            old_footer = *((ceph_msg_footer_old*)state_buffer.c_str());
+            old_footer = *((ceph_msg_footer_old*)state_buffer);
             footer.front_crc = old_footer.front_crc;
             footer.middle_crc = old_footer.middle_crc;
             footer.data_crc = old_footer.data_crc;
@@ -910,7 +965,7 @@ int AsyncConnection::_process_connection()
           break;
         }
 
-        if (memcmp(state_buffer.c_str(), CEPH_BANNER, strlen(CEPH_BANNER))) {
+        if (memcmp(state_buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
           ldout(async_msgr->cct, 0) << __func__ << " connect protocol error (bad banner) on peer "
                               << get_peer_addr() << dendl;
           goto fail;
@@ -919,7 +974,7 @@ int AsyncConnection::_process_connection()
         ldout(async_msgr->cct, 10) << __func__ << " get banner, ready to send banner" << dendl;
 
         bufferlist bl;
-        bl.append(state_buffer.c_str(), strlen(CEPH_BANNER));
+        bl.append(state_buffer, strlen(CEPH_BANNER));
         r = _try_send(bl);
         if (r == 0) {
           state = STATE_CONNECTING_WAIT_IDENTIFY_PEER;
@@ -950,7 +1005,7 @@ int AsyncConnection::_process_connection()
         }
 
         bufferlist bl;
-        bl.append(state_buffer);
+        bl.append(state_buffer, sizeof(paddr)*2);
         bufferlist::iterator p = bl.begin();
         try {
           ::decode(paddr, p);
@@ -1057,7 +1112,7 @@ int AsyncConnection::_process_connection()
           break;
         }
 
-        connect_reply = *((ceph_msg_connect_reply*)state_buffer.c_str());
+        connect_reply = *((ceph_msg_connect_reply*)state_buffer);
         connect_reply.features = ceph_sanitize_features(connect_reply.features);
 
         ldout(async_msgr->cct, 20) << __func__ << " connect got reply tag " << (int)connect_reply.tag
@@ -1083,7 +1138,7 @@ int AsyncConnection::_process_connection()
             break;
           }
 
-          authorizer_reply.push_back(state_buffer);
+          authorizer_reply.append(state_buffer, connect_reply.authorizer_len);
           bufferlist::iterator iter = authorizer_reply.begin();
           if (authorizer && !authorizer->verify_reply(iter)) {
             ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
@@ -1112,7 +1167,7 @@ int AsyncConnection::_process_connection()
           break;
         }
 
-        newly_acked_seq = *((uint64_t*)state_buffer.c_str());
+        newly_acked_seq = *((uint64_t*)state_buffer);
         ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
                             << " vs out_seq " << out_seq << dendl;
         while (newly_acked_seq > out_seq) {
@@ -1231,13 +1286,13 @@ int AsyncConnection::_process_connection()
           break;
         }
 
-        if (memcmp(state_buffer.c_str(), CEPH_BANNER, strlen(CEPH_BANNER))) {
-          ldout(async_msgr->cct, 1) << __func__ << " accept peer sent bad banner '" << state_buffer.c_str()
+        if (memcmp(state_buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
+          ldout(async_msgr->cct, 1) << __func__ << " accept peer sent bad banner '" << state_buffer
                                     << "' (should be '" << CEPH_BANNER << "')" << dendl;
           goto fail;
         }
 
-        addr_bl.append(state_bufferstrlen(CEPH_BANNER), sizeof(peer_addr));
+        addr_bl.append(state_buffer+strlen(CEPH_BANNER), sizeof(peer_addr));
         {
           bufferlist::iterator ti = addr_bl.begin();
           ::decode(peer_addr, ti);
@@ -1267,7 +1322,7 @@ int AsyncConnection::_process_connection()
           break;
         }
 
-        connect_msg = *((ceph_msg_connect*)state_buffer.c_str());
+        connect_msg = *((ceph_msg_connect*)state_buffer);
         // sanitize features
         connect_msg.features = ceph_sanitize_features(connect_msg.features);
         state = STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH;
@@ -1286,7 +1341,7 @@ int AsyncConnection::_process_connection()
           } else if (r > 0) {
             break;
           }
-          authorizer_bl.push_back(state_buffer);
+          authorizer_bl.append(state_buffer, connect_msg.authorizer_len);
         }
 
         ldout(async_msgr->cct, 20) << __func__ << " accept got peer connect_seq "
@@ -1319,7 +1374,7 @@ int AsyncConnection::_process_connection()
           break;
         }
 
-        newly_acked_seq = *((uint64_t*)state_buffer.c_str());
+        newly_acked_seq = *((uint64_t*)state_buffer);
         ldout(async_msgr->cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq << dendl;
         discard_requeued_up_to(newly_acked_seq);
         state = STATE_ACCEPTING_READY;
@@ -1855,6 +1910,8 @@ void AsyncConnection::fault()
 
   // requeue sent items
   requeue_sent();
+  recv_start = recv_end = 0;
+  state_offset = 0;
   outcoming_bl.clear();
   if (policy.standby && !is_queued()) {
     ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
index c3572c4f17131a86efbad8e4705550fa9a741d3a..9b40a692f6f5f90a7101e976f6e1317e03561900 100644 (file)
@@ -50,7 +50,7 @@ class AsyncConnection : public Connection {
   // the main usage is avoid error happen outside messenger threads
   int _try_send(bufferlist bl, bool send=true);
   int _send(Message *m);
-  int read_until(uint64_t needed, bufferptr &p);
+  int read_until(uint64_t needed, char *p);
   int _process_connection();
   void _connect();
   void _stop();
@@ -195,6 +195,7 @@ class AsyncConnection : public Connection {
     STATE_WAIT,       // just wait for racing connection
   };
 
+  static const int TCP_PREFETCH_MIN_SIZE;
   static const char *get_state_name(int state) {
       const char* const statenames[] = {"STATE_NONE",
                                         "STATE_OPEN",
@@ -263,6 +264,10 @@ class AsyncConnection : public Connection {
   EventCallbackRef local_deliver_handler;
   bool keepalive;
   struct iovec msgvec[IOV_MAX];
+  char *recv_buf;
+  uint32_t recv_max_prefetch;
+  uint32_t recv_start;
+  uint32_t recv_end;
   Mutex stop_lock; // used to protect `mark_down_cond`
   Cond stop_cond;
   set<uint64_t> register_time_events; // need to delete it if stop
@@ -294,7 +299,7 @@ class AsyncConnection : public Connection {
   atomic_t stopping;
 
   // used only for local state, it will be overwrite when state transition
-  bufferptr state_buffer;
+  char *state_buffer;
   // used only by "read_until"
   uint64_t state_offset;
   bufferlist outcoming_bl;