}
__u32 peer_cseq;
+ Pipe *existing = 0;
+ // this should roughly mirror pseudocode at
+ // http://ceph.newdream.net/wiki/Messaging_protocol
+
while (1) {
rc = tcp_read(sd, (char*)&peer_cseq, sizeof(peer_cseq));
if (rc < 0) {
// existing?
if (rank.rank_pipe.count(peer_addr)) {
- Pipe *existing = rank.rank_pipe[peer_addr];
+ existing = rank.rank_pipe[peer_addr];
existing->lock.Lock();
if (peer_cseq < existing->connect_seq) {
// old attempt, or we sent READY but they didn't get it.
dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
<< " > " << peer_cseq << ", RETRY" << dendl;
- connect_seq = existing->connect_seq;
+ connect_seq = existing->connect_seq; // so we can send it below..
existing->lock.Unlock();
rank.lock.Unlock();
char tag = CEPH_MSGR_TAG_RETRY;
continue;
} else {
// reconnect
- dout(10) << "accept peer sent cseq " << peer_cseq << " > " << existing->connect_seq << dendl;
+ dout(10) << "accept peer sent cseq " << peer_cseq
+ << " > " << existing->connect_seq << dendl;
goto replace;
}
assert(0);
-
- replace:
- dout(10) << "accept replacing " << existing << dendl;
- existing->state = STATE_CLOSED;
- existing->cond.Signal();
- existing->unregister_pipe();
-
- // steal queue and out_seq
- out_seq = existing->out_seq;
- if (!existing->sent.empty()) {
- out_seq = existing->sent.front()->get_seq()-1;
- q.splice(q.begin(), existing->sent);
- }
- q.splice(q.end(), existing->q);
-
- existing->lock.Unlock();
- break;
- }
-
- if (peer_cseq > 0) {
+ } // existing
+ else if (peer_cseq > 0) {
// we reset, and they are opening a new session
dout(10) << "accept we reset (peer sent cseq " << peer_cseq << "), sending RESETSESSION" << dendl;
rank.lock.Unlock();
} else {
// new session
dout(10) << "accept new session" << dendl;
- break;
+ goto open;
}
assert(0);
}
+ replace:
+ dout(10) << "accept replacing " << existing << dendl;
+ existing->state = STATE_CLOSED;
+ existing->cond.Signal();
+ existing->unregister_pipe();
+
+ // steal queue and out_seq
+ out_seq = existing->out_seq;
+ if (!existing->sent.empty()) {
+ out_seq = existing->sent.front()->get_seq()-1;
+ q.splice(q.begin(), existing->sent);
+ }
+ q.splice(q.end(), existing->q);
+
+ existing->lock.Unlock();
+
+ open:
// open
register_pipe();
rank.lock.Unlock();