]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
print strerror before any call to fault()
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 9 Nov 2007 16:28:29 +0000 (16:28 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 9 Nov 2007 16:28:29 +0000 (16:28 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2041 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/msg/SimpleMessenger.cc
trunk/ceph/msg/SimpleMessenger.h

index 9b6df4296b228784a476e137fb03e1116c692515..77588a6dd274070ec5f309b9be8d4d6ef6a03fc7 100644 (file)
@@ -31,6 +31,8 @@
 #include <iostream>
 #include <fstream>
 
+#include "common/Timer.h"
+
 #define dout(l)  if (l<=g_conf.debug_ms) *_dout << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " "
 #define derr(l)  if (l<=g_conf.debug_ms) *_derr << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " "
 
@@ -769,7 +771,7 @@ int Rank::Pipe::accept()
   if (tcp_write(sd, &tag, 1) < 0 ||
       tcp_write(sd, (char*)&connect_seq, sizeof(connect_seq)) < 0) {
     // hrmpf
-    dout(10) << "accept couldn't send initial tag+seq: "
+    dout(2) << "accept couldn't send initial tag+seq: "
              << strerror(errno) << dendl;
     fault();
   }
@@ -862,7 +864,7 @@ int Rank::Pipe::connect()
   msglen = msgvec[0].iov_len + msgvec[1].iov_len;
 
   if (do_sendmsg(newsd, &msg, msglen)) {
-    dout(20) << "connect couldn't write self, seq" << dendl;
+    dout(2) << "connect couldn't write self, seq, " << strerror(errno) << dendl;
     goto fail;
   }
 
@@ -871,8 +873,10 @@ int Rank::Pipe::connect()
   // wait for tag
   tag = -1;
   if (tcp_read(newsd, &tag, 1) < 0 ||
-      tcp_read(newsd, (char*)&cseq, sizeof(cseq)) < 0)
+      tcp_read(newsd, (char*)&cseq, sizeof(cseq)) < 0) {
+    dout(2) << "connect read tag, seq, " << strerror(errno) << dendl;
     goto fail;
+  }
 
   dout(20) << "connect got initial tag " << (int)tag << " + seq " << cseq << dendl;
 
@@ -880,19 +884,20 @@ int Rank::Pipe::connect()
 
   // FINISH
   if (state != STATE_CONNECTING) {
-    dout(20) << "connect hmm, not connecting anymore, failing" << dendl;
+    dout(2) << "connect hmm, race durring connect(), not connecting anymore, failing" << dendl;
     goto fail2;  // hmm!
   }
-  if (tag != CEPH_MSGR_TAG_READY) {
-    dout(20) << "connect didn't get READY tag, my connect_seq=" << connect_seq
-             << ", got " << cseq << dendl;
+  if (tag == CEPH_MSGR_TAG_REJECT) {
     if (connect_seq != cseq) {
-      dout(0) << "connect got REJECT tag, old connect_seq was " << connect_seq
+      dout(0) << "connect got REJECT, old connect_seq was " << connect_seq
              << ", taking new " << cseq << dendl;
       connect_seq = cseq;
+    } else {
+      dout(0) << "connect got REJECT, connection race (harmless), connect_seq=" << connect_seq << dendl;
     }
     goto fail2;
   }
+  assert(tag == CEPH_MSGR_TAG_READY);
   state = STATE_OPEN;
   this->sd = newsd;
   connect_seq++;
@@ -909,7 +914,7 @@ int Rank::Pipe::connect()
   lock.Lock();
  fail2:
   if (newsd > 0) ::close(newsd);
-  fault();
+  fault(tag == CEPH_MSGR_TAG_REJECT);
   return -1;
 }
 
@@ -933,29 +938,29 @@ void Rank::Pipe::unregister_pipe()
   }
 }
 
-void Rank::Pipe::fault()
+void Rank::Pipe::fault(bool silent)
 {
   assert(lock.is_locked());
 
   if (q.empty()) {
-    dout(0) << "fault nothing to send, closing" << dendl;
+    if (!silent) dout(0) << "fault nothing to send, closing" << dendl;
     state = STATE_CLOSED;
   } else {
     utime_t now = g_clock.now();
     if (state != STATE_CONNECTING) {
-      dout(0) << "fault initiating reconnect" << dendl;
+      if (!silent) dout(0) << "fault initiating reconnect" << dendl;
       connect_seq++;
       state = STATE_CONNECTING;
       first_fault = now;
     } else if (first_fault.sec() == 0) {
-      dout(0) << "fault during connect" << dendl;
+      if (!silent) dout(0) << "fault during connect" << dendl;
       first_fault = now;
     } else {
       utime_t failinterval = now - first_fault;
       utime_t retryinterval = now - last_attempt;
-      dout(10) << "fault failure was " << failinterval 
-               << " ago, last attempt was at " << last_attempt
-               << ", " << retryinterval << " ago" << dendl;
+      if (!silent) dout(10) << "fault failure was " << failinterval 
+                           << " ago, last attempt was at " << last_attempt
+                           << ", " << retryinterval << " ago" << dendl;
       if (failinterval > g_conf.ms_fail_interval) {
        // give up
        dout(0) << "fault giving up" << dendl;
@@ -1049,7 +1054,7 @@ void Rank::Pipe::reader()
     int rc = tcp_read(sd, (char*)&tag, 1);
     if (rc < 0) {
       lock.Lock();
-      dout(20) << "reader couldn't read tag" << dendl;
+      dout(2) << "reader couldn't read tag, " << strerror(errno) << dendl;
       fault();
       continue;
     }
@@ -1061,7 +1066,7 @@ void Rank::Pipe::reader()
       int rc = tcp_read( sd, (char*)&seq, sizeof(seq));
       lock.Lock();
       if (rc < 0) {
-       dout(20) << "reader couldn't read ack seq" << dendl;
+       dout(2) << "reader couldn't read ack seq, " << strerror(errno) << dendl;
        fault();
       } else {
        dout(15) << "reader got ack seq " << seq << dendl;
@@ -1082,7 +1087,7 @@ void Rank::Pipe::reader()
       dout(20) << "reader got MSG" << dendl;
       Message *m = read_message();
       if (!m) {
-       derr(10) << "reader read null message" << dendl;
+       derr(2) << "reader read null message, " << strerror(errno) << dendl;
        lock.Lock();
        fault();
        continue;
@@ -1131,7 +1136,7 @@ void Rank::Pipe::reader()
     else if (tag == CEPH_MSGR_TAG_CLOSE) {
       dout(20) << "reader got CLOSE" << dendl;
       lock.Lock();
-      fault();  // treat as a fault; i.e. reconnect|close
+      fault(true);  // treat as a fault; i.e. reconnect|close
       continue;
     }
     else {
@@ -1161,7 +1166,17 @@ void Rank::Pipe::reader()
   }
 }
 
-
+/*
+class FakeSocketError : public Context {
+  int sd;
+public:
+  FakeSocketError(int s) : sd(s) {}
+  void finish(int r) {
+    cout << "faking socket error on " << sd << std::endl;
+    ::close(sd);
+  }
+};
+*/
 
 /* write msgs to socket.
  * also, client.
@@ -1198,7 +1213,7 @@ void Rank::Pipe::writer()
        int rc = write_ack(send_seq);
        lock.Lock();
        if (rc < 0) {
-         dout(20) << "writer couldn't write ack" << dendl;
+         dout(2) << "writer couldn't write ack, " << strerror(errno) << dendl;
          fault();
          continue;
        }
@@ -1221,6 +1236,18 @@ void Rank::Pipe::writer()
                  << errno << ": " << strerror(errno) << dendl;
          fault();
         }
+
+       /*
+       // HACK
+       static bool did = false;
+       if (!did && 
+           m->get_source() == entity_name_t::OSD(0) &&
+           m->get_dest() == entity_name_t::OSD(1)) {
+         did = true;
+         dout(0) << "will fake socket error on " << sd << dendl;
+         g_timer.add_event_after(15.0, new FakeSocketError(sd));
+       }
+       */
       }
       continue;
     }
index 4ef9144e343cada913e3f8a2da9a8c0b37fdb54f..af2de93c2b134b6b5247dd16e1336a0bed677378 100644 (file)
@@ -109,7 +109,7 @@ private:
     int do_sendmsg(int sd, struct msghdr *msg, int len);
     int write_ack(unsigned s);
 
-    void fault();
+    void fault(bool silent=false);
     void fail();
 
     void take_queue(list<Message*>& ls) {