]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Make header insert when sending
authorHaomai Wang <haomaiwang@gmail.com>
Thu, 28 May 2015 17:24:36 +0000 (01:24 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Sun, 31 May 2015 03:03:44 +0000 (11:03 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index eed275db5961df7915f9200a44095371dc91fd6f..43b4ca515f287c213a0bc8922dd35d71d34e77fe 100644 (file)
@@ -1804,7 +1804,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     existing->lock.Unlock();
     return 0;
   }
-  existing->write_lock.Unlock();
   existing->lock.Unlock();
 
  open:
@@ -1848,9 +1847,10 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   if (reply.authorizer_len)
     reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
 
-  uint64_t s = in_seq.read();
-  if (reply.tag == CEPH_MSGR_TAG_SEQ)
+  if (reply.tag == CEPH_MSGR_TAG_SEQ) {
+    uint64_t s = in_seq.read();
     reply_bl.append((char*)&s, sizeof(s));
+  }
 
   lock.Unlock();
   // Because "replacing" will prevent other connections preempt this addr,
@@ -1963,15 +1963,13 @@ int AsyncConnection::send_message(Message *m)
   prepare_send_message(f, m, bl);
 
   Mutex::Locker l(write_lock);
-  m->set_req(out_seq.inc());
+  m->set_seq(out_seq.inc());
   // "features" changes will change the payload encoding
   if (can_write == NOWRITE || get_features() != f) {
     // ensure the correctness of message encoding
     bl.clear();
     ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer, can_write=" << can_write << " previous "
-                              << f << " != " << get_features() << dedl;
-  } else {
-    inject_msg_header_crc(m, bl);
+                              << f << " != " << get_features() << dendl;
   }
   if (!is_queued() && can_write == CANWRITE) {
     if (write_message(m, bl) < 0) {
@@ -1999,11 +1997,11 @@ void AsyncConnection::requeue_sent()
 
   list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
   while (!sent.empty()) {
-    pair<bufferlist, Message*> p = sent.back();
+    Message* m = sent.back();
     sent.pop_back();
-    ldout(async_msgr->cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
-                         << " (" << p.second->get_seq() << ")" << dendl;
-    rq.push_front(p);
+    ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend "
+                               << " (" << m->get_seq() << ")" << dendl;
+    rq.push_front(make_pair(bufferlist(), m));
   }
 }
 
@@ -2036,9 +2034,9 @@ void AsyncConnection::discard_out_queue()
   ldout(async_msgr->cct, 10) << __func__ << " started" << dendl;
   assert(write_lock.is_locked());
 
-  for (list<pair<bufferlist, Message*> >::iterator p = sent.begin(); p != sent.end(); ++p) {
-    ldout(async_msgr->cct, 20) << __func__ << " discard " << p->second << dendl;
-    p->second->put();
+  for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
+    ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl;
+    (*p)->put();
   }
   sent.clear();
   for (map<int, list<pair<bufferlist, Message*> > >::iterator p = out_q.begin(); p != out_q.end(); ++p)
@@ -2072,7 +2070,6 @@ void AsyncConnection::fault()
 {
   if (state == STATE_CLOSED) {
     ldout(async_msgr->cct, 10) << __func__ << " state is already " << get_state_name(state) << dendl;
-    center->dispatch_event_external(reset_handler);
     return ;
   }
 
@@ -2200,7 +2197,6 @@ void AsyncConnection::_stop()
 
 void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl)
 {
-  assert(write_lock.is_locked());
   ldout(async_msgr->cct, 20) << __func__ << " m" << " " << *m << dendl;
 
   // associate message with Connection (for benefit of encode_payload)
@@ -2243,25 +2239,7 @@ void AsyncConnection::prepare_send_message(uint64_t features, Message *m, buffer
     }
   }
 
-  // send tag
-  char tag = CEPH_MSGR_TAG_MSG;
-  bl.append(&tag, sizeof(tag));
-
-  ceph_msg_header_old oldheader;
-  if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
-    bl.append((char*)&header, sizeof(header));
-  } else {
-    memcpy(&oldheader, &header, sizeof(header));
-    oldheader.src.name = header.src;
-    oldheader.src.addr = get_peer_addr();
-    oldheader.orig_src = oldheader.src;
-    oldheader.reserved = header.reserved;
-    // delay crc calculate to "inject_msg_header_crc"
-    oldheader.crc = 0;
-    bl.append((char*)&oldheader, sizeof(oldheader));
-  }
-
-  bl.claim_append(m->get_payload());
+  bl.append(m->get_payload());
   bl.append(m->get_middle());
   bl.append(m->get_data());
 
@@ -2288,15 +2266,40 @@ int AsyncConnection::write_message(Message *m, bufferlist& bl)
 {
   assert(write_lock.is_locked());
   assert(can_write == CANWRITE);
+
   if (!policy.lossy) {
     // put on sent list
-    sent.push_back(make_pair(bl, m));
+    sent.push_back(m);
     m->get();
   }
 
+  bufferlist complete_bl;
+  // send tag
+  char tag = CEPH_MSGR_TAG_MSG;
+  complete_bl.append(&tag, sizeof(tag));
+
+  m->calc_header_crc();
+  ceph_msg_header& header = m->get_header();
+  if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
+    complete_bl.append((char*)&header, sizeof(header));
+  } else {
+    ceph_msg_header_old oldheader;
+    memcpy(&oldheader, &header, sizeof(header));
+    oldheader.src.name = header.src;
+    oldheader.src.addr = get_peer_addr();
+    oldheader.orig_src = oldheader.src;
+    oldheader.reserved = header.reserved;
+    // delay crc calculate to "inject_msg_header_crc"
+    oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
+                                sizeof(oldheader) - sizeof(oldheader.crc));
+    complete_bl.append((char*)&oldheader, sizeof(oldheader));
+  }
+
+  complete_bl.claim_append(bl);
+
   ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
                              << " " << m << dendl;
-  int rc = _try_send(bl);
+  int rc = _try_send(complete_bl);
   if (rc < 0) {
     ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
                               << cpp_strerror(errno) << dendl;
@@ -2315,13 +2318,13 @@ void AsyncConnection::handle_ack(uint64_t seq)
   ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl;
   // trim sent list
   Mutex::Locker l(write_lock);
-  while (!sent.empty() && sent.front().second->get_seq() <= seq) {
-    pair<bufferlist, Message*> p = sent.front();
+  while (!sent.empty() && sent.front()->get_seq() <= seq) {
+    Message* m = sent.front();
     sent.pop_front();
     ldout(async_msgr->cct, 10) << __func__ << " got ack seq "
-                               << seq << " >= " << p.second->get_seq() << " on "
-                               << p.second << " " << *(p.second) << dendl;
-    p.second->put();
+                               << seq << " >= " << m->get_seq() << " on "
+                               << m << " " << *m << dendl;
+    m->put();
   }
 }
 
@@ -2387,15 +2390,14 @@ void AsyncConnection::handle_write()
       if (!m)
         break;
 
-      // send_message may not encode message
-      if (!data.length()) {
+      // send_message or requeue messages may not encode message
+      if (!data.length())
         prepare_send_message(get_features(), m, data);
-        inject_msg_header_crc(m, bl);
-      }
 
       r = write_message(m, data);
       if (r < 0) {
         ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
+        write_lock.Unlock();
         goto fail;
       } else if (r > 0) {
         break;
@@ -2409,30 +2411,19 @@ void AsyncConnection::handle_write()
       bl.append(CEPH_MSGR_TAG_ACK);
       bl.append((char*)&s, sizeof(s));
       ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
-      ack_left.sub(ack_left);
+      ack_left.sub(left);
       r = _try_send(bl);
     } else if (is_queued()) {
       r = _try_send(bl);
     }
 
+    write_lock.Unlock();
     if (r < 0) {
       ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
       goto fail;
     }
-    write_lock.Unlock();
   } else {
     write_lock.Unlock();
-
-    if (async_msgr->cct->_conf->ms_inject_internal_delays) {
-      if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
-        ldout(msgr->cct, 10) << __func__ << " sleep for "
-                             << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
-        utime_t t;
-        t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
-        t.sleep();
-      }
-    }
-
     lock.Lock();
     write_lock.Lock();
     if (state == STATE_STANDBY && !policy.server && is_queued()) {
@@ -2444,7 +2435,9 @@ void AsyncConnection::handle_write()
       if (r < 0) {
         ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
         write_lock.Unlock();
-        goto fail;
+        fault();
+        lock.Unlock();
+        return ;
       }
     }
     write_lock.Unlock();
@@ -2452,7 +2445,9 @@ void AsyncConnection::handle_write()
   }
 
   return ;
+
  fail:
+  lock.Lock();
   fault();
   lock.Unlock();
 }
index 9a6d24164824eb3161fb3e177ff5533f376513d2..64c2921d904dd44cfd126b7bf349f0f185fd5c52 100644 (file)
@@ -55,20 +55,6 @@ class AsyncConnection : public Connection {
   int _try_send(bufferlist &bl, bool send=true);
   int _send(Message *m);
   void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
-#define HEADER_CRC_OFF (sizeof(char)+offset(ceph_msg_header, crc))
-  void inject_msg_header_crc(m, bl) {
-    if (msgr->crcflags & MSG_CRC_HEADER) {
-      if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
-        __le32 *header_crc = static_cast<__le32*>(&bl[HEADER_CRC_OFF]);
-        m->calc_header_crc();
-        *header_crc = m->get_header().crc;
-      } else {
-        ceph_msg_header_old *oldheader = static_cast<ceph_msg_header_old*>(&bl[sizeof(char)]);
-        oldheader->crc = ceph_crc32c(0, (unsigned char*)oldheader,
-                                     sizeof(*oldheader) - sizeof(oldheader->crc));
-      }
-    }
-  }
   int read_until(uint64_t needed, char *p);
   int _process_connection();
   void _connect();
@@ -249,7 +235,7 @@ class AsyncConnection : public Connection {
   } can_write;
   bool open_write;
   map<int, list<pair<bufferlist, Message*> > > out_q;  // priority queue for outbound msgs
-  list<pair<bufferlist, Message*> > sent; // the first bufferlist need to inject seq
+  list<Message*> sent; // the first bufferlist need to inject seq
   list<Message*> local_messages;    // local deliver
   bufferlist outcoming_bl;
   bool keepalive;