existing->lock.Lock();
if (peer_cseq < existing->connect_seq) {
- // old attempt, or we sent READY but they didn't get it.
- dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
- << " > " << peer_cseq << ", RETRY" << dendl;
- existing->lock.Unlock();
- rank.lock.Unlock();
- char tag = CEPH_MSGR_TAG_RETRY;
- if (tcp_write(sd, &tag, 1) < 0)
- goto fail;
- continue;
- }
-
- if ((peer_cseq == existing->connect_seq && peer_addr < rank.rank_addr) ||
- (peer_cseq > existing->connect_seq)) {
- // connection race, incoming wins; or
- // reconnect
- dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
- << " <= " << peer_cseq << ", replacing" << dendl;
- assert(existing->state == STATE_CONNECTING ||
- existing->state == STATE_WAIT);
- existing->state = STATE_CLOSED;
- existing->cond.Signal();
- existing->unregister_pipe();
-
- // steal queue and out_seq
- out_seq = existing->out_seq;
- if (!existing->sent.empty()) {
- out_seq = existing->sent.front()->get_seq()-1;
- q.splice(q.begin(), existing->sent);
+ if (peer_cseq == 0) {
+ dout(10) << "accept peer reset, then tried to connect to us, replacing" << dendl;
+ existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
+ goto replace;
+ } else {
+ // old attempt, or we sent READY but they didn't get it.
+ dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
+ << " > " << peer_cseq << ", RETRY" << dendl;
+ existing->lock.Unlock();
+ rank.lock.Unlock();
+ char tag = CEPH_MSGR_TAG_RETRY;
+ if (tcp_write(sd, &tag, 1) < 0)
+ goto fail;
+ if (tcp_write(sd, (char*)&connect_seq, sizeof(connect_seq)) < 0)
+ goto fail;
+ continue;
}
- q.splice(q.end(), existing->q);
-
- existing->lock.Unlock();
- break;
}
-
+
if (peer_cseq == existing->connect_seq) {
- // connection race, our outgoing wins
- dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
- << " == " << peer_cseq << ", sending WAIT" << dendl;
- assert(peer_addr > rank.rank_addr);
- assert(existing->state == STATE_CONNECTING);
- existing->lock.Unlock();
+ // connection race
+ if (peer_addr < rank.rank_addr) {
+ // incoming wins
+ dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
+ << " == " << peer_cseq << ", replacing my attempt" << dendl;
+ assert(existing->state == STATE_CONNECTING ||
+ existing->state == STATE_WAIT);
+ goto replace;
+ } else {
+ // our existing outgoing wins
+ dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
+ << " == " << peer_cseq << ", sending WAIT" << dendl;
+ assert(peer_addr > rank.rank_addr);
+ assert(existing->state == STATE_CONNECTING); // this will win
+ existing->lock.Unlock();
+ rank.lock.Unlock();
+
+ char tag = CEPH_MSGR_TAG_WAIT;
+ if (tcp_write(sd, &tag, 1) < 0)
+ goto fail;
+ continue;
+ }
+ }
+
+ assert(peer_cseq > existing->connect_seq);
+ if (existing->connect_seq == 0) {
+ dout(10) << "accept we reset (peer sent cseq " << peer_cseq
+ << ", " << existing << ".cseq = " << existing->connect_seq
+ << "), sending RESETSESSION" << dendl;
rank.lock.Unlock();
-
- char tag = CEPH_MSGR_TAG_WAIT;
+ existing->lock.Unlock();
+ char tag = CEPH_MSGR_TAG_RESETSESSION;
if (tcp_write(sd, &tag, 1) < 0)
goto fail;
continue;
+ } else {
+ // reconnect
+ dout(10) << "accept peer sent cseq " << peer_cseq << " > " << existing->connect_seq << dendl;
+ goto replace;
}
-
assert(0);
+
+ replace:
+ dout(10) << "accept replacing " << existing << dendl;
+ existing->state = STATE_CLOSED;
+ existing->cond.Signal();
+ existing->unregister_pipe();
+
+ // steal queue and out_seq
+ out_seq = existing->out_seq;
+ if (!existing->sent.empty()) {
+ out_seq = existing->sent.front()->get_seq()-1;
+ q.splice(q.begin(), existing->sent);
+ }
+ q.splice(q.end(), existing->q);
+
+ existing->lock.Unlock();
+ break;
}
if (peer_cseq > 0) {
- // we reset, and are opening a new session
+ // we reset, and they are opening a new session
dout(10) << "accept we reset (peer sent cseq " << peer_cseq << "), sending RESETSESSION" << dendl;
rank.lock.Unlock();
char tag = CEPH_MSGR_TAG_RESETSESSION;
dout(10) << "accept new session" << dendl;
break;
}
- assert(0);
+ assert(0);
}
- // okay!
+ // open
register_pipe();
rank.lock.Unlock();
dout(2) << "connect couldn't read peer addr, " << strerror(errno) << dendl;
goto fail;
}
- dout(20) << "connect read peer addr " << paddr << dendl;
+ dout(20) << "connect read peer addr " << paddr << " on socket " << newsd << dendl;
if (!peer_addr.is_local_to(paddr)) {
dout(0) << "connect peer identifies itself as "
<< paddr << "... wrong node!" << dendl;
}
dout(0) << "connect got RESETSESSION" << dendl;
- report_failures();
- for (unsigned i=0; i<rank.local.size(); i++)
- if (rank.local[i] && rank.local[i]->get_dispatcher())
- rank.local[i]->queue_remote_reset(peer_addr, last_dest_name);
- // renumber outgoing seqs
- out_seq = 0;
- for (list<Message*>::iterator p = q.begin(); p != q.end(); p++)
- (*p)->set_seq(++out_seq);
- in_seq = 0;
+ was_session_reset();
+ continue;
}
if (tag == CEPH_MSGR_TAG_RETRY) {
int rc = tcp_read(newsd, (char*)&cseq, sizeof(cseq));
dout(10) << "fault already closed" << dendl;
return;
}
+
+ ::close(sd);
+ sd = -1;
+
if (q.empty()) {
if (state == STATE_CLOSING || onconnect) {
dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl;
dout(0) << "fault nothing to send, going to standby" << dendl;
state = STATE_STANDBY;
}
- ::close(sd);
- sd = -1;
return;
}
rank.local[i]->queue_reset(peer_addr, last_dest_name);
}
+void Rank::Pipe::was_session_reset()
+{
+ dout(10) << "was_reset_session" << dendl;
+ report_failures();
+ for (unsigned i=0; i<rank.local.size(); i++)
+ if (rank.local[i] && rank.local[i]->get_dispatcher())
+ rank.local[i]->queue_remote_reset(peer_addr, last_dest_name);
+
+ // renumber outgoing seqs
+ out_seq = 0;
+ for (list<Message*>::iterator p = q.begin(); p != q.end(); p++)
+ (*p)->set_seq(++out_seq);
+
+ in_seq = 0;
+ connect_seq = 0;
+}
+
void Rank::Pipe::report_failures()
{
// report failures
{
lock.Lock();
- while (state != STATE_CLOSED && state != STATE_WAIT) {
+ while (state != STATE_CLOSED) { // && state != STATE_WAIT) {
// standby?
if (!q.empty() && state == STATE_STANDBY)
state = STATE_CONNECTING;
continue;
}
- if (state != STATE_CONNECTING &&
+ if (state != STATE_CONNECTING && state != STATE_WAIT &&
(!q.empty() || in_seq > in_seq_acked)) {
// send ack?