#include <iostream>
#include <fstream>
+#include "common/Timer.h"
+
#define dout(l) if (l<=g_conf.debug_ms) *_dout << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " "
#define derr(l) if (l<=g_conf.debug_ms) *_derr << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " "
if (tcp_write(sd, &tag, 1) < 0 ||
tcp_write(sd, (char*)&connect_seq, sizeof(connect_seq)) < 0) {
// hrmpf
- dout(10) << "accept couldn't send initial tag+seq: "
+ dout(2) << "accept couldn't send initial tag+seq: "
<< strerror(errno) << dendl;
fault();
}
msglen = msgvec[0].iov_len + msgvec[1].iov_len;
if (do_sendmsg(newsd, &msg, msglen)) {
- dout(20) << "connect couldn't write self, seq" << dendl;
+ dout(2) << "connect couldn't write self, seq, " << strerror(errno) << dendl;
goto fail;
}
// wait for tag
tag = -1;
if (tcp_read(newsd, &tag, 1) < 0 ||
- tcp_read(newsd, (char*)&cseq, sizeof(cseq)) < 0)
+ tcp_read(newsd, (char*)&cseq, sizeof(cseq)) < 0) {
+ dout(2) << "connect read tag, seq, " << strerror(errno) << dendl;
goto fail;
+ }
dout(20) << "connect got initial tag " << (int)tag << " + seq " << cseq << dendl;
// FINISH
if (state != STATE_CONNECTING) {
- dout(20) << "connect hmm, not connecting anymore, failing" << dendl;
+ dout(2) << "connect hmm, race durring connect(), not connecting anymore, failing" << dendl;
goto fail2; // hmm!
}
- if (tag != CEPH_MSGR_TAG_READY) {
- dout(20) << "connect didn't get READY tag, my connect_seq=" << connect_seq
- << ", got " << cseq << dendl;
+ if (tag == CEPH_MSGR_TAG_REJECT) {
if (connect_seq != cseq) {
- dout(0) << "connect got REJECT tag, old connect_seq was " << connect_seq
+ dout(0) << "connect got REJECT, old connect_seq was " << connect_seq
<< ", taking new " << cseq << dendl;
connect_seq = cseq;
+ } else {
+ dout(0) << "connect got REJECT, connection race (harmless), connect_seq=" << connect_seq << dendl;
}
goto fail2;
}
+ assert(tag == CEPH_MSGR_TAG_READY);
state = STATE_OPEN;
this->sd = newsd;
connect_seq++;
lock.Lock();
fail2:
if (newsd > 0) ::close(newsd);
- fault();
+ fault(tag == CEPH_MSGR_TAG_REJECT);
return -1;
}
}
}
-void Rank::Pipe::fault()
+void Rank::Pipe::fault(bool silent)
{
assert(lock.is_locked());
if (q.empty()) {
- dout(0) << "fault nothing to send, closing" << dendl;
+ if (!silent) dout(0) << "fault nothing to send, closing" << dendl;
state = STATE_CLOSED;
} else {
utime_t now = g_clock.now();
if (state != STATE_CONNECTING) {
- dout(0) << "fault initiating reconnect" << dendl;
+ if (!silent) dout(0) << "fault initiating reconnect" << dendl;
connect_seq++;
state = STATE_CONNECTING;
first_fault = now;
} else if (first_fault.sec() == 0) {
- dout(0) << "fault during connect" << dendl;
+ if (!silent) dout(0) << "fault during connect" << dendl;
first_fault = now;
} else {
utime_t failinterval = now - first_fault;
utime_t retryinterval = now - last_attempt;
- dout(10) << "fault failure was " << failinterval
- << " ago, last attempt was at " << last_attempt
- << ", " << retryinterval << " ago" << dendl;
+ if (!silent) dout(10) << "fault failure was " << failinterval
+ << " ago, last attempt was at " << last_attempt
+ << ", " << retryinterval << " ago" << dendl;
if (failinterval > g_conf.ms_fail_interval) {
// give up
dout(0) << "fault giving up" << dendl;
int rc = tcp_read(sd, (char*)&tag, 1);
if (rc < 0) {
lock.Lock();
- dout(20) << "reader couldn't read tag" << dendl;
+ dout(2) << "reader couldn't read tag, " << strerror(errno) << dendl;
fault();
continue;
}
int rc = tcp_read( sd, (char*)&seq, sizeof(seq));
lock.Lock();
if (rc < 0) {
- dout(20) << "reader couldn't read ack seq" << dendl;
+ dout(2) << "reader couldn't read ack seq, " << strerror(errno) << dendl;
fault();
} else {
dout(15) << "reader got ack seq " << seq << dendl;
dout(20) << "reader got MSG" << dendl;
Message *m = read_message();
if (!m) {
- derr(10) << "reader read null message" << dendl;
+ derr(2) << "reader read null message, " << strerror(errno) << dendl;
lock.Lock();
fault();
continue;
else if (tag == CEPH_MSGR_TAG_CLOSE) {
dout(20) << "reader got CLOSE" << dendl;
lock.Lock();
- fault(); // treat as a fault; i.e. reconnect|close
+ fault(true); // treat as a fault; i.e. reconnect|close
continue;
}
else {
}
}
-
+/*
+class FakeSocketError : public Context {
+ int sd;
+public:
+ FakeSocketError(int s) : sd(s) {}
+ void finish(int r) {
+ cout << "faking socket error on " << sd << std::endl;
+ ::close(sd);
+ }
+};
+*/
/* write msgs to socket.
* also, client.
int rc = write_ack(send_seq);
lock.Lock();
if (rc < 0) {
- dout(20) << "writer couldn't write ack" << dendl;
+ dout(2) << "writer couldn't write ack, " << strerror(errno) << dendl;
fault();
continue;
}
<< errno << ": " << strerror(errno) << dendl;
fault();
}
+
+ /*
+ // HACK
+ static bool did = false;
+ if (!did &&
+ m->get_source() == entity_name_t::OSD(0) &&
+ m->get_dest() == entity_name_t::OSD(1)) {
+ did = true;
+ dout(0) << "will fake socket error on " << sd << dendl;
+ g_timer.add_event_after(15.0, new FakeSocketError(sd));
+ }
+ */
}
continue;
}