From 59f1c4bd8c40ed16ffd3d7fe187f407e70a59c2f Mon Sep 17 00:00:00 2001 From: sageweil Date: Fri, 9 Nov 2007 16:28:29 +0000 Subject: [PATCH] print strerror before any call to fault() git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2041 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/msg/SimpleMessenger.cc | 71 +++++++++++++++++++++---------- trunk/ceph/msg/SimpleMessenger.h | 2 +- 2 files changed, 50 insertions(+), 23 deletions(-) diff --git a/trunk/ceph/msg/SimpleMessenger.cc b/trunk/ceph/msg/SimpleMessenger.cc index 9b6df4296b228..77588a6dd2740 100644 --- a/trunk/ceph/msg/SimpleMessenger.cc +++ b/trunk/ceph/msg/SimpleMessenger.cc @@ -31,6 +31,8 @@ #include #include +#include "common/Timer.h" + #define dout(l) if (l<=g_conf.debug_ms) *_dout << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " " #define derr(l) if (l<=g_conf.debug_ms) *_derr << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " " @@ -769,7 +771,7 @@ int Rank::Pipe::accept() if (tcp_write(sd, &tag, 1) < 0 || tcp_write(sd, (char*)&connect_seq, sizeof(connect_seq)) < 0) { // hrmpf - dout(10) << "accept couldn't send initial tag+seq: " + dout(2) << "accept couldn't send initial tag+seq: " << strerror(errno) << dendl; fault(); } @@ -862,7 +864,7 @@ int Rank::Pipe::connect() msglen = msgvec[0].iov_len + msgvec[1].iov_len; if (do_sendmsg(newsd, &msg, msglen)) { - dout(20) << "connect couldn't write self, seq" << dendl; + dout(2) << "connect couldn't write self, seq, " << strerror(errno) << dendl; goto fail; } @@ -871,8 +873,10 @@ int Rank::Pipe::connect() // wait for tag tag = -1; if (tcp_read(newsd, &tag, 1) < 0 || - tcp_read(newsd, (char*)&cseq, sizeof(cseq)) < 0) + tcp_read(newsd, (char*)&cseq, sizeof(cseq)) < 0) { + dout(2) << "connect read tag, seq, " << strerror(errno) << dendl; goto fail; + } dout(20) << "connect got initial tag " << (int)tag << " + seq " << cseq << dendl; @@ -880,19 +884,20 @@ int Rank::Pipe::connect() // FINISH if (state != STATE_CONNECTING) { - dout(20) << "connect hmm, not connecting anymore, failing" << dendl; + dout(2) << "connect hmm, race durring connect(), not connecting anymore, failing" << dendl; goto fail2; // hmm! } - if (tag != CEPH_MSGR_TAG_READY) { - dout(20) << "connect didn't get READY tag, my connect_seq=" << connect_seq - << ", got " << cseq << dendl; + if (tag == CEPH_MSGR_TAG_REJECT) { if (connect_seq != cseq) { - dout(0) << "connect got REJECT tag, old connect_seq was " << connect_seq + dout(0) << "connect got REJECT, old connect_seq was " << connect_seq << ", taking new " << cseq << dendl; connect_seq = cseq; + } else { + dout(0) << "connect got REJECT, connection race (harmless), connect_seq=" << connect_seq << dendl; } goto fail2; } + assert(tag == CEPH_MSGR_TAG_READY); state = STATE_OPEN; this->sd = newsd; connect_seq++; @@ -909,7 +914,7 @@ int Rank::Pipe::connect() lock.Lock(); fail2: if (newsd > 0) ::close(newsd); - fault(); + fault(tag == CEPH_MSGR_TAG_REJECT); return -1; } @@ -933,29 +938,29 @@ void Rank::Pipe::unregister_pipe() } } -void Rank::Pipe::fault() +void Rank::Pipe::fault(bool silent) { assert(lock.is_locked()); if (q.empty()) { - dout(0) << "fault nothing to send, closing" << dendl; + if (!silent) dout(0) << "fault nothing to send, closing" << dendl; state = STATE_CLOSED; } else { utime_t now = g_clock.now(); if (state != STATE_CONNECTING) { - dout(0) << "fault initiating reconnect" << dendl; + if (!silent) dout(0) << "fault initiating reconnect" << dendl; connect_seq++; state = STATE_CONNECTING; first_fault = now; } else if (first_fault.sec() == 0) { - dout(0) << "fault during connect" << dendl; + if (!silent) dout(0) << "fault during connect" << dendl; first_fault = now; } else { utime_t failinterval = now - first_fault; utime_t retryinterval = now - last_attempt; - dout(10) << "fault failure was " << failinterval - << " ago, last attempt was at " << last_attempt - << ", " << retryinterval << " ago" << dendl; + if (!silent) dout(10) << "fault failure was " << failinterval + << " ago, last attempt was at " << last_attempt + << ", " << retryinterval << " ago" << dendl; if (failinterval > g_conf.ms_fail_interval) { // give up dout(0) << "fault giving up" << dendl; @@ -1049,7 +1054,7 @@ void Rank::Pipe::reader() int rc = tcp_read(sd, (char*)&tag, 1); if (rc < 0) { lock.Lock(); - dout(20) << "reader couldn't read tag" << dendl; + dout(2) << "reader couldn't read tag, " << strerror(errno) << dendl; fault(); continue; } @@ -1061,7 +1066,7 @@ void Rank::Pipe::reader() int rc = tcp_read( sd, (char*)&seq, sizeof(seq)); lock.Lock(); if (rc < 0) { - dout(20) << "reader couldn't read ack seq" << dendl; + dout(2) << "reader couldn't read ack seq, " << strerror(errno) << dendl; fault(); } else { dout(15) << "reader got ack seq " << seq << dendl; @@ -1082,7 +1087,7 @@ void Rank::Pipe::reader() dout(20) << "reader got MSG" << dendl; Message *m = read_message(); if (!m) { - derr(10) << "reader read null message" << dendl; + derr(2) << "reader read null message, " << strerror(errno) << dendl; lock.Lock(); fault(); continue; @@ -1131,7 +1136,7 @@ void Rank::Pipe::reader() else if (tag == CEPH_MSGR_TAG_CLOSE) { dout(20) << "reader got CLOSE" << dendl; lock.Lock(); - fault(); // treat as a fault; i.e. reconnect|close + fault(true); // treat as a fault; i.e. reconnect|close continue; } else { @@ -1161,7 +1166,17 @@ void Rank::Pipe::reader() } } - +/* +class FakeSocketError : public Context { + int sd; +public: + FakeSocketError(int s) : sd(s) {} + void finish(int r) { + cout << "faking socket error on " << sd << std::endl; + ::close(sd); + } +}; +*/ /* write msgs to socket. * also, client. @@ -1198,7 +1213,7 @@ void Rank::Pipe::writer() int rc = write_ack(send_seq); lock.Lock(); if (rc < 0) { - dout(20) << "writer couldn't write ack" << dendl; + dout(2) << "writer couldn't write ack, " << strerror(errno) << dendl; fault(); continue; } @@ -1221,6 +1236,18 @@ void Rank::Pipe::writer() << errno << ": " << strerror(errno) << dendl; fault(); } + + /* + // HACK + static bool did = false; + if (!did && + m->get_source() == entity_name_t::OSD(0) && + m->get_dest() == entity_name_t::OSD(1)) { + did = true; + dout(0) << "will fake socket error on " << sd << dendl; + g_timer.add_event_after(15.0, new FakeSocketError(sd)); + } + */ } continue; } diff --git a/trunk/ceph/msg/SimpleMessenger.h b/trunk/ceph/msg/SimpleMessenger.h index 4ef9144e343ca..af2de93c2b134 100644 --- a/trunk/ceph/msg/SimpleMessenger.h +++ b/trunk/ceph/msg/SimpleMessenger.h @@ -109,7 +109,7 @@ private: int do_sendmsg(int sd, struct msghdr *msg, int len); int write_ack(unsigned s); - void fault(); + void fault(bool silent=false); void fail(); void take_queue(list& ls) { -- 2.39.5