#define dout(l) if (l<=g_conf.debug_ms) *_dout << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this << ")."
#define derr(l) if (l<=g_conf.debug_ms) *_derr << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this << ")."
-/*
- * we have to be careful about connection races:
- * A initiates connection
- * B initiates connection
- * B accepts A's connection
- * A rejects B's connection (or vice-versa)
- *
- * this is controlled by whether accept uses the new incoming socket
- * as the new pipe. two cases:
- * old new(incoming)
- * connecting connecting -> use socket initiated by lower address
- * open connecting
- * -> use new socket _only_ if connect_seq matches. that is, the
- * peer reconnected subsequent to the current open socket. if
- * connect_seq _doesn't_ match, it means that it is an 'old' attempt.
- */
-
int Rank::Pipe::accept()
{
dout(10) << "accept" << dendl;
}
dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl;
}
-
- __u32 cseq;
- rc = tcp_read(sd, (char*)&cseq, sizeof(cseq));
- if (rc < 0) {
- dout(10) << "accept couldn't read connect seq" << dendl;
- state = STATE_CLOSED;
- return -1;
- }
-
- dout(20) << "accept got connect_seq " << cseq << dendl;
-
- __u32 myseq = connect_seq = 1;
+
+ __u32 peer_cseq;
+ connect_seq = 0;
+
+ while (1) {
+ rc = tcp_read(sd, (char*)&peer_cseq, sizeof(peer_cseq));
+ if (rc < 0) {
+ dout(10) << "accept couldn't read connect peer_seq" << dendl;
+ goto fail;
+ }
+ dout(20) << "accept got peer_connect_seq " << peer_cseq << dendl;
- // register pipe.
- rank.lock.Lock();
- {
- if (rank.rank_pipe.count(peer_addr) == 0) {
- dout(10) << "accept new peer " << peer_addr << dendl;
- register_pipe();
- } else {
- // hmm!
- Pipe *other = rank.rank_pipe[peer_addr];
- other->lock.Lock();
-
- dout(10) << "accept got connect_seq " << cseq
- << ", existing pipe connect_seq " << other->connect_seq
- << " state " << other->state
- << dendl;
-
- // if open race, low addr's pipe "wins".
- // otherwise, look at connect_seq
- if ((other->state == STATE_CONNECTING && peer_addr < rank.rank_addr) ||
- (other->state == STATE_OPEN && cseq >= other->connect_seq)) {
- dout(10) << "accept already had pipe " << other
- << ", but switching to this new one" << dendl;
- // switch to this new Pipe
- other->state = STATE_CLOSED;
- assert(q.empty());
- other->cond.Signal();
- other->unregister_pipe();
- register_pipe();
+ rank.lock.Lock();
+
+ // existing?
+ if (rank.rank_pipe.count(peer_addr)) {
+ Pipe *existing = rank.rank_pipe[peer_addr];
+ existing->lock.Lock();
+
+ if (peer_cseq < existing->connect_seq) {
+ // old attempt, or we sent READY but they didn't get it.
+ dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
+ << " > " << peer_cseq << ", RETRY" << dendl;
+ existing->lock.Unlock();
+ rank.lock.Unlock();
+ char tag = CEPH_MSGR_TAG_RETRY;
+ if (tcp_write(sd, &tag, 1) < 0)
+ goto fail;
+ continue;
+ }
+
+ if ((peer_cseq == existing->connect_seq && peer_addr < rank.rank_addr) ||
+ (peer_cseq > existing->connect_seq)) {
+ // connection race, incoming wins; or
+ // reconnect
+ dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
+ << " <= " << peer_cseq << ", replacing" << dendl;
+ assert(existing->state == STATE_CONNECTING ||
+ existing->state == STATE_WAIT);
+ existing->state = STATE_CLOSED;
+ existing->cond.Signal();
+ existing->unregister_pipe();
// steal queue and out_seq
- out_seq = other->out_seq;
- if (!other->sent.empty()) {
- out_seq = other->sent.front()->get_seq()-1;
- q.splice(q.begin(), other->sent);
+ out_seq = existing->out_seq;
+ if (!existing->sent.empty()) {
+ out_seq = existing->sent.front()->get_seq()-1;
+ q.splice(q.begin(), existing->sent);
}
- q.splice(q.end(), other->q);
- }
- else {
- dout(10) << "accept already had pipe " << other
- << ", closing this one" << dendl;
- myseq = other->connect_seq;
- state = STATE_CLOSED;
+ q.splice(q.end(), existing->q);
+
+ existing->lock.Unlock();
+ break;
}
- other->lock.Unlock();
- }
+
+ if (peer_cseq == existing->connect_seq) {
+ // connection race, our outgoing wins
+ dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
+ << " == " << peer_cseq << ", sending WAIT" << dendl;
+ assert(peer_addr > rank.rank_addr);
+ assert(existing->state == STATE_CONNECTING);
+ existing->lock.Unlock();
+ rank.lock.Unlock();
+
+ char tag = CEPH_MSGR_TAG_WAIT;
+ if (tcp_write(sd, &tag, 1) < 0)
+ goto fail;
+ continue;
+ }
+
+ assert(0);
+ }
+
+ if (peer_cseq > 0) {
+ // we reset, and are opening a new session
+ dout(10) << "accept we reset (peer sent cseq " << peer_cseq << "), sending RESETSESSION" << dendl;
+ rank.lock.Unlock();
+ char tag = CEPH_MSGR_TAG_RESETSESSION;
+ if (tcp_write(sd, &tag, 1) < 0)
+ goto fail;
+ continue;
+ } else {
+ // new session
+ dout(10) << "accept new session" << dendl;
+ break;
+ }
+ assert(0);
}
+
+ // okay!
+ register_pipe();
rank.lock.Unlock();
- char tag;
- if (state == STATE_CLOSED) {
- dout(10) << "accept closed, sending REJECT tag" << dendl;
- tag = CEPH_MSGR_TAG_REJECT;
- } else {
- dout(10) << "accept sending READY tag" << dendl;
- tag = CEPH_MSGR_TAG_READY;
- state = STATE_OPEN;
- kick_reader_on_join = true;
- }
+ connect_seq = peer_cseq + 1;
+ dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
- if (tcp_write(sd, &tag, 1) < 0 ||
- tcp_write(sd, (char*)&myseq, sizeof(myseq)) < 0) {
- dout(2) << "accept couldn't send initial tag+seq: "
- << strerror(errno) << dendl;
- fault();
+ // send READY
+ {
+ char tag = CEPH_MSGR_TAG_READY;
+ if (tcp_write(sd, &tag, 1) < 0)
+ goto fail;
}
if (state != STATE_CLOSED) {
- dout(10) << "accept starting writer, "
- << "state=" << state << dendl;
+ dout(10) << "accept starting writer, " << "state=" << state << dendl;
start_writer();
}
-
dout(20) << "accept done" << dendl;
return 0; // success.
+
+
+ fail:
+ state = STATE_CLOSED;
+ fault();
+ return -1;
}
int Rank::Pipe::connect()
sd = -1;
}
__u32 cseq = connect_seq;
- __u32 rseq;
lock.Unlock();
-
+
int newsd;
char tag = -1;
int rc;
struct msghdr msg;
struct iovec msgvec[2];
int msglen;
-
+
// create socket?
newsd = ::socket(AF_INET, SOCK_STREAM, 0);
if (newsd < 0) {
myAddr.sin_family = AF_INET;
myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
myAddr.sin_port = htons( 0 );
-
rc = ::bind(newsd, (struct sockaddr *) &myAddr, sizeof(myAddr));
assert(rc>=0);
goto fail;
}
- // identify myself, and send open seq
+ // identify myself, and send initial cseq
memset(&msg, 0, sizeof(msg));
msgvec[0].iov_base = (char*)&rank.rank_addr;
msgvec[0].iov_len = sizeof(rank.rank_addr);
msg.msg_iovlen = 2;
msglen = msgvec[0].iov_len + msgvec[1].iov_len;
- if (do_sendmsg(newsd, &msg, msglen)) {
- dout(2) << "connect couldn't write self, seq, " << strerror(errno) << dendl;
- goto fail;
- }
-
- dout(20) << "connect wrote self, seq, waiting for tag" << dendl;
-
- // wait for tag
- if (tcp_read(newsd, &tag, 1) < 0 ||
- tcp_read(newsd, (char*)&rseq, sizeof(rseq)) < 0) {
- dout(2) << "connect read tag, seq, " << strerror(errno) << dendl;
- goto fail;
- }
-
- dout(20) << "connect got initial tag " << (int)tag << " + seq " << rseq << dendl;
+ while (1) {
+ if (do_sendmsg(newsd, &msg, msglen)) {
+ dout(2) << "connect couldn't write self, seq, " << strerror(errno) << dendl;
+ goto fail;
+ }
+ dout(20) << "connect wrote (self +) cseq, waiting for tag" << dendl;
+ if (tcp_read(newsd, &tag, 1) < 0) {
+ dout(2) << "connect read tag, seq, " << strerror(errno) << dendl;
+ goto fail;
+ }
+ dout(20) << "connect got tag " << (int)tag << dendl;
- lock.Lock();
+ if (tag == CEPH_MSGR_TAG_RESETSESSION) {
+ lock.Lock();
+ if (state != STATE_CONNECTING) {
+ dout(0) << "connect got RESETSESSION but no longer connecting" << dendl;
+ goto stop_locked;
+ }
- // FINISH
- if (state != STATE_CONNECTING) {
- dout(2) << "connect hmm, race durring connect(), not connecting anymore, failing" << dendl;
- goto fail_locked; // hmm!
- }
- if (tag == CEPH_MSGR_TAG_REJECT) {
- if (connect_seq != rseq) {
- dout(0) << "connect got REJECT, old connect_seq was " << connect_seq
- << ", taking new " << rseq << dendl;
- connect_seq = rseq;
- } else {
- dout(10) << "connect got REJECT, connection race (harmless), connect_seq=" << connect_seq << dendl;
- }
- goto fail_locked;
- }
- assert(tag == CEPH_MSGR_TAG_READY);
- state = STATE_OPEN;
- this->sd = newsd;
- connect_seq++;
- if (rseq != connect_seq) {
- dout(0) << "connect REMOTE RESET: my seq = " << connect_seq << ", remote seq = " << rseq << dendl;
- if (rseq < connect_seq) {
- connect_seq = rseq;
+ dout(0) << "connect got RESETSESSION" << dendl;
report_failures();
for (unsigned i=0; i<rank.local.size(); i++)
if (rank.local[i] && rank.local[i]->get_dispatcher())
for (list<Message*>::iterator p = q.begin(); p != q.end(); p++)
(*p)->set_seq(++out_seq);
in_seq = 0;
- } else {
- dout(0) << "WTF" << dendl;
- assert(0);
}
- }
- first_fault = last_attempt = utime_t();
- dout(20) << "connect success " << connect_seq << dendl;
+ if (tag == CEPH_MSGR_TAG_RETRY) {
+ int rc = tcp_read(newsd, (char*)&cseq, sizeof(cseq));
+ if (rc < 0) {
+ dout(0) << "connect got RETRY tag but couldn't read cseq" << dendl;
+ goto fail;
+ }
+ lock.Lock();
+ if (state != STATE_CONNECTING) {
+ dout(0) << "connect got RETRY, but connection race or something, failing" << dendl;
+ goto stop_locked;
+ }
+ assert(cseq > connect_seq);
+ dout(10) << "connect got RETRY " << connect_seq << " -> " << cseq << dendl;
+ connect_seq = cseq;
+ }
- if (!reader_running) {
- dout(20) << "connect starting reader" << dendl;
- start_reader();
+ if (tag == CEPH_MSGR_TAG_RESETSESSION ||
+ tag == CEPH_MSGR_TAG_RETRY) {
+ // retry
+ lock.Unlock();
+ memset(&msg, 0, sizeof(msg));
+ msgvec[0].iov_base = (char*)&cseq;
+ msgvec[0].iov_len = sizeof(cseq);
+ msg.msg_iov = msgvec;
+ msg.msg_iovlen = 1;
+ msglen = msgvec[0].iov_len;
+ continue;
+ }
+
+ if (tag == CEPH_MSGR_TAG_WAIT) {
+ lock.Lock();
+ if (state == STATE_CONNECTING) {
+ dout(3) << "connect got WAIT (connection race), will wait" << dendl;
+ state = STATE_WAIT;
+ } else {
+ dout(3) << "connect got WAIT (connection race), and lo, the wait is already over" << dendl;
+ }
+ goto stop_locked;
+ }
+
+ if (tag == CEPH_MSGR_TAG_READY) {
+ lock.Lock();
+ if (state != STATE_CONNECTING) {
+ dout(3) << "connect got READY but no longer connecting?" << dendl;
+ goto stop_locked;
+ }
+
+ // hooray!
+ state = STATE_OPEN;
+ sd = newsd;
+ connect_seq = cseq+1;
+ first_fault = last_attempt = utime_t();
+ dout(20) << "connect success " << connect_seq << dendl;
+
+ if (!reader_running) {
+ dout(20) << "connect starting reader" << dendl;
+ start_reader();
+ }
+ return 0;
+ }
+
+ // protocol error
+ dout(0) << "connect got bad tag " << (int)tag << dendl;
+ goto fail;
}
- return 0;
fail:
lock.Lock();
- fail_locked:
- if (newsd >= 0) ::close(newsd);
- fault(tag == CEPH_MSGR_TAG_REJECT); // quiet if reject (not socket error)
+ if (state == STATE_CONNECTING)
+ fault();
+ else
+ dout(3) << "connect fault, but state != connecting, stopping" << dendl;
+
+ stop_locked:
+ if (newsd >= 0)
+ ::close(newsd);
return -1;
}
else if (tag == CEPH_MSGR_TAG_CLOSE) {
dout(20) << "reader got CLOSE" << dendl;
lock.Lock();
- fault(true); // treat as a fault; i.e. reconnect|close
- continue;
+ if (state == STATE_CLOSING)
+ state = STATE_CLOSED;
+ else
+ state = STATE_CLOSING;
+ cond.Signal();
+ break;
}
else {
dout(0) << "reader bad tag " << (int)tag << dendl;
}
rank.lock.Unlock();
}
+
+ dout(10) << "reader done" << dendl;
}
/*
{
lock.Lock();
- while (state != STATE_CLOSED) {
+ while (state != STATE_CLOSED && state != STATE_WAIT) {
// standby?
if (!q.empty() && state == STATE_STANDBY)
state = STATE_CONNECTING;
if (state == STATE_CLOSING) {
// write close tag
dout(20) << "writer writing CLOSE tag" << dendl;
- char c = CEPH_MSGR_TAG_CLOSE;
+ char tag = CEPH_MSGR_TAG_CLOSE;
+ state = STATE_CLOSED;
lock.Unlock();
- if (sd) ::write(sd, &c, 1);
+ if (sd) ::write(sd, &tag, 1);
lock.Lock();
- state = STATE_CLOSED;
continue;
}
}
rank.lock.Unlock();
}
+
+ dout(10) << "writer done" << dendl;
}
}
// unmarshall message
+ dout(20) << "reader got " << front.length() << " + " << data.length() << " byte message from "
+ << env.src << dendl;
+
Message *m = decode_message(env, front, data);
- dout(20) << "reader got " << front.length() << " + " << data.length() << " byte message from "
- << m->get_source() << dendl;
return m;
}