]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Allow msg encode without write_lock holding
authorHaomai Wang <haomaiwang@gmail.com>
Thu, 28 May 2015 14:39:24 +0000 (22:39 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Sat, 30 May 2015 14:29:55 +0000 (22:29 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index 146424dfe15097aa29b9f644e70d82d278e5aa24..6d2b8e89b4c4ef3cf4df061967165f9173893921 100644 (file)
@@ -1937,7 +1937,7 @@ void AsyncConnection::accept(int incoming)
 
 int AsyncConnection::send_message(Message *m)
 {
-  ldout(async_msgr->cct, 10) << __func__ << dendl;
+  ldout(async_msgr->cct, 10) << __func__ << " m=" << m << dendl;
 
   if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
    ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
@@ -1952,18 +1952,27 @@ int AsyncConnection::send_message(Message *m)
   logger->inc(l_msgr_send_messages);
 
   bufferlist bl;
-  Mutex::Locker l(write_lock);
-  m->set_seq(out_seq.inc());
-  m->get_header().src = async_msgr->get_myname();
+  uint64_t f = get_features();
+
+  // optimistic think it's ok to encode(actually may broken now)
   if (!m->get_priority())
     m->set_priority(async_msgr->get_default_send_priority());
-  if (can_write == 1)
-    prepare_send_message(m, bl);
 
+  prepare_send_message(f, m, bl);
+
+  Mutex::Locker l(write_lock);
+  m->set_req(out_seq.inc());
+  // "features" changes will change the payload encoding
+  if (can_write == 0 || get_features() != f) {
+    // ensure the correctness of message encoding
+    bl.clear();
+    ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer, can_write=" << can_write << " previous "
+                              << f << " != " << get_features() << dedl;
+  } else {
+    inject_msg_header_crc(m, bl);
+  }
   if (!is_queued() && can_write == 1) {
-    ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl;
-    int r = write_message(m, bl);
-    if (r < 0) {
+    if (write_message(m, bl) < 0) {
       ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
       // we want to handle fault within internal thread
       center->dispatch_event_external(write_handler);
@@ -2185,14 +2194,14 @@ void AsyncConnection::_stop()
   center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
 }
 
-void AsyncConnection::prepare_send_message(Message *m, bufferlist &bl)
+void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl)
 {
   assert(write_lock.is_locked());
   ldout(async_msgr->cct, 20) << __func__ << " m" << " " << *m << dendl;
 
   // associate message with Connection (for benefit of encode_payload)
+  m->get_header().src = async_msgr->get_myname();
   m->set_connection(this);
-  uint64_t features = get_features();
   if (m->empty_payload())
     ldout(async_msgr->cct, 20) << __func__ << " encoding features "
                                << features << " " << m << " " << *m << dendl;
@@ -2243,12 +2252,8 @@ void AsyncConnection::prepare_send_message(Message *m, bufferlist &bl)
     oldheader.src.addr = get_peer_addr();
     oldheader.orig_src = oldheader.src;
     oldheader.reserved = header.reserved;
-    if (msgr->crcflags & MSG_CRC_HEADER) {
-       oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
-                                   sizeof(oldheader) - sizeof(oldheader.crc));
-    } else {
-       oldheader.crc = 0;
-    }
+    // delay crc calculate to "inject_msg_header_crc"
+    oldheader.crc = 0;
     bl.append((char*)&oldheader, sizeof(oldheader));
   }
 
@@ -2380,10 +2385,11 @@ void AsyncConnection::handle_write()
         break;
 
       // send_message may not encode message
-      if (!data.length())
-        prepare_send_message(m, data);
+      if (!data.length()) {
+        prepare_send_message(get_features(), m, data);
+        inject_msg_header_crc(m, bl);
+      }
 
-      ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl;
       r = write_message(m, data);
       if (r < 0) {
         ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
index bf9df82863cbd63efd5f8e87e0771bd8f2fccb18..11850856c129f52598c3d26a9dc2c5ace5e735c8 100644 (file)
@@ -54,7 +54,21 @@ class AsyncConnection : public Connection {
   // the main usage is avoid error happen outside messenger threads
   int _try_send(bufferlist &bl, bool send=true);
   int _send(Message *m);
-  void prepare_send_message(Message *m, bufferlist &bl);
+  void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
+#define HEADER_CRC_OFF (sizeof(char)+offset(ceph_msg_header, crc))
+  void inject_msg_header_crc(m, bl) {
+    if (msgr->crcflags & MSG_CRC_HEADER) {
+      if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
+        __le32 *header_crc = static_cast<__le32*>(&bl[HEADER_CRC_OFF]);
+        m->calc_header_crc();
+        *header_crc = m->get_header().crc;
+      } else {
+        ceph_msg_header_old *oldheader = static_cast<ceph_msg_header_old*>(&bl[sizeof(char)]);
+        oldheader->crc = ceph_crc32c(0, (unsigned char*)oldheader,
+                                     sizeof(*oldheader) - sizeof(oldheader->crc));
+      }
+    }
+  }
   int read_until(uint64_t needed, char *p);
   int _process_connection();
   void _connect();