]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Add ms_inject_* to AsyncConnection
authorHaomai Wang <haomaiwang@gmail.com>
Tue, 13 Jan 2015 14:18:02 +0000 (22:18 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 15 Jan 2015 19:07:12 +0000 (03:07 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc

index b588c4fe6e82dae83b460384829e4380740ae4ba..b7d3b374aff9441addb47e6c10a8b768837d2eaa 100644 (file)
 #define dout_prefix _conn_prefix(_dout)
 ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
   return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this
-        << " sd=" << sd << " :" << port
-        << " s=" << get_state_name(state)
-        << " pgs=" << peer_global_seq
-        << " cs=" << connect_seq
-        << " l=" << policy.lossy
-        << ").";
+                << " sd=" << sd << " :" << port
+                << " s=" << get_state_name(state)
+                << " pgs=" << peer_global_seq
+                << " cs=" << connect_seq
+                << " l=" << policy.lossy
+                << ").";
 }
 
 const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
@@ -204,12 +204,12 @@ int AsyncConnection::read_bulk(int fd, char *buf, int len)
     if (errno == EAGAIN || errno == EINTR) {
       nread = 0;
     } else {
-      ldout(async_msgr->cct, 1) << __func__ << " Reading from fd=" << fd
+      ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << fd
                           << " : "<< strerror(errno) << dendl;
       return -1;
     }
   } else if (nread == 0) {
-    ldout(async_msgr->cct, 1) << __func__ << " Peer close file descriptor "
+    ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor "
                               << fd << dendl;
     return -1;
   }
@@ -289,6 +289,13 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send)
     return -EINTR;
   }
 
+  if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
+    if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
+      ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
+      ::shutdown(sd, SHUT_RDWR);
+    }
+  }
+
   uint64_t sent = 0;
   list<bufferptr>::const_iterator pb = outcoming_bl.buffers().begin();
   uint64_t left_pbrs = outcoming_bl.buffers().size();
@@ -362,6 +369,14 @@ int AsyncConnection::read_until(uint64_t len, char *p)
   assert(len);
   ldout(async_msgr->cct, 20) << __func__ << " len is " << len << " state_offset is "
                              << state_offset << dendl;
+
+  if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
+    if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
+      ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
+      ::shutdown(sd, SHUT_RDWR);
+    }
+  }
+
   int r = 0;
   uint64_t left = len - state_offset;
   if (recv_end > recv_start) {
@@ -567,7 +582,7 @@ void AsyncConnection::process()
 
           // verify header crc
           if (header_crc != header.crc) {
-            ldout(async_msgr->cct,0) << __func__ << "reader got bad header crc "
+            ldout(async_msgr->cct,0) << __func__ << " reader got bad header crc "
                               << header_crc << " != " << header.crc << dendl;
             goto fail;
           }
@@ -764,7 +779,7 @@ void AsyncConnection::process()
           //
 
           if (session_security.get() == NULL) {
-            ldout(async_msgr->cct, 10) << __func__ << " No session security set" << dendl;
+            ldout(async_msgr->cct, 10) << __func__ << " no session security set" << dendl;
           } else {
             if (session_security->check_message_signature(message)) {
               ldout(async_msgr->cct, 0) << __func__ << "Signature check failed" << dendl;
@@ -1076,7 +1091,7 @@ int AsyncConnection::_process_connection()
         r = _try_send(bl);
         if (r == 0) {
           state = STATE_CONNECTING_WAIT_CONNECT_REPLY;
-          ldout(async_msgr->cct,20) << __func__ << "connect wrote (self +) cseq, waiting for reply" << dendl;
+          ldout(async_msgr->cct,20) << __func__ << " connect wrote (self +) cseq, waiting for reply" << dendl;
         } else if (r > 0) {
           state = STATE_WAIT_SEND;
           state_after_send = STATE_CONNECTING_WAIT_CONNECT_REPLY;
@@ -1194,9 +1209,9 @@ int AsyncConnection::_process_connection()
         assert(connect_seq == connect_reply.connect_seq);
         backoff = utime_t();
         set_features((uint64_t)connect_reply.features & (uint64_t)connect_msg.features);
-        ldout(async_msgr->cct, 10) << __func__ << "connect success " << connect_seq
-                             << ", lossy = " << policy.lossy << ", features "
-                             << get_features() << dendl;
+        ldout(async_msgr->cct, 10) << __func__ << " connect success " << connect_seq
+                                   << ", lossy = " << policy.lossy << ", features "
+                                   << get_features() << dendl;
 
         // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
         // connection.  PLR
@@ -1452,11 +1467,11 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co
   }
 
   if (reply.tag == CEPH_MSGR_TAG_SEQ) {
-    ldout(async_msgr->cct, 10) << __func__ << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
+    ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
     state = STATE_CONNECTING_WAIT_ACK_SEQ;
   }
   if (reply.tag == CEPH_MSGR_TAG_READY) {
-    ldout(async_msgr->cct, 10) << __func__ << "got CEPH_MSGR_TAG_READY " << dendl;
+    ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl;
     state = STATE_CONNECTING_READY;
   }
 
@@ -1480,7 +1495,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   reply.protocol_version = async_msgr->get_proto_version(peer_type, false);
 
   // mismatch?
-  ldout(async_msgr->cct,10) << __func__ << "accept my proto " << reply.protocol_version
+  ldout(async_msgr->cct, 10) << __func__ << " accept my proto " << reply.protocol_version
                       << ", their proto " << connect.protocol_version << dendl;
   if (connect.protocol_version != reply.protocol_version) {
     return _reply_accept(CEPH_MSGR_TAG_BADPROTOVER, connect, reply, authorizer_reply);
@@ -1504,7 +1519,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   }
   uint64_t feat_missing = policy.features_required & ~(uint64_t)connect.features;
   if (feat_missing) {
-    ldout(async_msgr->cct, 1) << __func__ << "peer missing required features "
+    ldout(async_msgr->cct, 1) << __func__ << " peer missing required features "
                         << std::hex << feat_missing << std::dec << dendl;
     return _reply_accept(CEPH_MSGR_TAG_FEATURES, connect, reply, authorizer_reply);
   }
@@ -1518,11 +1533,20 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   }
 
   // We've verified the authorizer for this AsyncConnection, so set up the session security structure.  PLR
-  ldout(async_msgr->cct, 10) << __func__ << " accept setting up session_security." << dendl;
+  ldout(async_msgr->cct, 10) << __func__ << " accept setting up session_security." << dendl;
 
   // existing?
   lock.Unlock();
   AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr);
+
+  if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+    ldout(msgr->cct, 10) << __func__ << " sleep for "
+                         << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
+    utime_t t;
+    t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
+    t.sleep();
+  }
+
   lock.Lock();
   if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
     ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down, state="
@@ -1551,7 +1575,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
       goto replace;
     }
 
-    ldout(async_msgr->cct, 0) << __func__ << "accept connect_seq " << connect.connect_seq
+    ldout(async_msgr->cct, 0) << __func__ << " accept connect_seq " << connect.connect_seq
                               << " vs existing " << existing->connect_seq
                               << " state " << existing->state << dendl;
 
@@ -1566,7 +1590,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
 
     if (connect.connect_seq < existing->connect_seq) {
       // old attempt, or we sent READY but they didn't get it.
-      ldout(async_msgr->cct, 10) << __func__ << "accept existing " << existing << ".cseq "
+      ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing << ".cseq "
                            << existing->connect_seq << " > " << connect.connect_seq
                            << ", RETRY_SESSION" << dendl;
       reply.connect_seq = existing->connect_seq + 1;
@@ -1610,7 +1634,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     assert(connect.global_seq >= existing->peer_global_seq);
     if (policy.resetcheck &&   // RESETSESSION only used by servers; peers do not reset each other
         existing->connect_seq == 0) {
-      ldout(async_msgr->cct, 0) << __func__ << "accept we reset (peer sent cseq "
+      ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
                           << connect.connect_seq << ", " << existing << ".cseq = "
                           << existing->connect_seq << "), sending RESETSESSION" << dendl;
       return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
@@ -1623,12 +1647,12 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   } // existing
   else if (!replacing && connect.connect_seq > 0) {
     // we reset, and they are opening a new session
-    ldout(async_msgr->cct, 0) << __func__ << "accept we reset (peer sent cseq "
+    ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
                         << connect.connect_seq << "), sending RESETSESSION" << dendl;
     return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
   } else {
     // new session
-    ldout(async_msgr->cct,10) << __func__ << "accept new session" << dendl;
+    ldout(async_msgr->cct, 10) << __func__ << " accept new session" << dendl;
     existing = NULL;
     goto open;
   }
@@ -1642,6 +1666,14 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   }
   ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl;
 
+  if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+    ldout(msgr->cct, 10) << __func__ << " sleep for "
+                         << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
+    utime_t t;
+    t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
+    t.sleep();
+  }
+
   // There is no possible that existing connection will acquire this lock
   existing->lock.Lock();
 
@@ -1744,17 +1776,26 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   // if replacing, this con is alreadly accepted.
   lock.Unlock();
   r = async_msgr->accept_conn(this);
+
+  if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+    ldout(msgr->cct, 10) << __func__ << " sleep for "
+                         << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
+    utime_t t;
+    t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
+    t.sleep();
+  }
+
   lock.Lock();
   if (r < 0) {
     ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr
                               << " just fail later one(this)" << dendl;
-    goto fail;
+    goto fail_registered;
   }
   if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
     ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down, state="
                               << get_state_name(state) << dendl;
     assert(state == STATE_CLOSED);
-    goto fail;
+    goto fail_registered;
   }
 
   // notify
@@ -1763,7 +1804,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
 
   r = _try_send(reply_bl);
   if (r < 0)
-    goto fail;
+    goto fail_registered;
 
   if (r == 0) {
     state = next_state;
@@ -1775,13 +1816,26 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
 
   return 0;
 
+ fail_registered:
+  ldout(async_msgr->cct, 10) << __func__ << " accept fault after register" << dendl;
+
+  if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+    ldout(async_msgr->cct, 10) << __func__ << " sleep for "
+                               << async_msgr->cct->_conf->ms_inject_internal_delays
+                               << dendl;
+    utime_t t;
+    t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
+    t.sleep();
+  }
+
  fail:
+  ldout(async_msgr->cct, 10) << __func__ << " failed to accept." << dendl;
   return -1;
 }
 
 void AsyncConnection::_connect()
 {
-  ldout(async_msgr->cct, 10) << __func__ << " " << connect_seq << dendl;
+  ldout(async_msgr->cct, 10) << __func__ << " csq=" << connect_seq << dendl;
 
   state = STATE_CONNECTING;
   stopping.set(0);
@@ -1792,7 +1846,7 @@ void AsyncConnection::_connect()
 
 void AsyncConnection::accept(int incoming)
 {
-  ldout(async_msgr->cct, 10) << __func__ << " " << incoming << dendl;
+  ldout(async_msgr->cct, 10) << __func__ << " sd=" << incoming << dendl;
   assert(sd < 0);
 
   sd = incoming;
@@ -1877,7 +1931,7 @@ void AsyncConnection::discard_requeued_up_to(uint64_t seq)
  */
 void AsyncConnection::discard_out_queue()
 {
-  ldout(async_msgr->cct, 10) << __func__ << " " << dendl;
+  ldout(async_msgr->cct, 10) << __func__ << " started" << dendl;
 
   for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
     ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl;
@@ -1900,7 +1954,7 @@ int AsyncConnection::randomize_out_seq()
     // here.  We'll check it on the call.  PLR
     int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq));
     out_seq &= SEQ_MASK;
-    lsubdout(async_msgr->cct, ms, 10) << __func__ << "randomize_out_seq " << out_seq << dendl;
+    lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << out_seq << dendl;
     return seq_error;
   } else {
     // previously, seq #'s always started at 0.
@@ -1970,13 +2024,13 @@ void AsyncConnection::fault()
 
 void AsyncConnection::was_session_reset()
 {
-  ldout(async_msgr->cct,10) << __func__ << "was_session_reset" << dendl;
+  ldout(async_msgr->cct,10) << __func__ << " started" << dendl;
   discard_out_queue();
 
   center->dispatch_event_external(remote_reset_handler);
 
   if (randomize_out_seq()) {
-    lsubdout(async_msgr->cct,ms,15) << __func__ << " Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
+    lsubdout(async_msgr->cct,ms,15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
   }
 
   in_seq = 0;
@@ -1993,6 +2047,16 @@ void AsyncConnection::_stop()
     center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
 
   async_msgr->unregister_conn(this);
+
+  if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+    ldout(msgr->cct, 10) << __func__ << " sleep for "
+                         << async_msgr->cct->_conf->ms_inject_internal_delays
+                         << dendl;
+    utime_t t;
+    t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
+    t.sleep();
+  }
+
   shutdown_socket();
   discard_out_queue();
   open_write = false;