From d28c98f4036e8f9b4ed06f22c4ff082e196e6e91 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 18 Mar 2008 15:49:13 -0700 Subject: [PATCH] msgr: reworked accept a bit to more closely mirror pseudocode in wiki --- src/msg/SimpleMessenger.cc | 52 ++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 803dd1b48e756..ca1df55cc3e56 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -789,7 +789,11 @@ int Rank::Pipe::accept() } __u32 peer_cseq; + Pipe *existing = 0; + // this should roughly mirror pseudocode at + // http://ceph.newdream.net/wiki/Messaging_protocol + while (1) { rc = tcp_read(sd, (char*)&peer_cseq, sizeof(peer_cseq)); if (rc < 0) { @@ -802,7 +806,7 @@ int Rank::Pipe::accept() // existing? if (rank.rank_pipe.count(peer_addr)) { - Pipe *existing = rank.rank_pipe[peer_addr]; + existing = rank.rank_pipe[peer_addr]; existing->lock.Lock(); if (peer_cseq < existing->connect_seq) { @@ -814,7 +818,7 @@ int Rank::Pipe::accept() // old attempt, or we sent READY but they didn't get it. dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq << " > " << peer_cseq << ", RETRY" << dendl; - connect_seq = existing->connect_seq; + connect_seq = existing->connect_seq; // so we can send it below.. existing->lock.Unlock(); rank.lock.Unlock(); char tag = CEPH_MSGR_TAG_RETRY; @@ -864,30 +868,13 @@ int Rank::Pipe::accept() continue; } else { // reconnect - dout(10) << "accept peer sent cseq " << peer_cseq << " > " << existing->connect_seq << dendl; + dout(10) << "accept peer sent cseq " << peer_cseq + << " > " << existing->connect_seq << dendl; goto replace; } assert(0); - - replace: - dout(10) << "accept replacing " << existing << dendl; - existing->state = STATE_CLOSED; - existing->cond.Signal(); - existing->unregister_pipe(); - - // steal queue and out_seq - out_seq = existing->out_seq; - if (!existing->sent.empty()) { - out_seq = existing->sent.front()->get_seq()-1; - q.splice(q.begin(), existing->sent); - } - q.splice(q.end(), existing->q); - - existing->lock.Unlock(); - break; - } - - if (peer_cseq > 0) { + } // existing + else if (peer_cseq > 0) { // we reset, and they are opening a new session dout(10) << "accept we reset (peer sent cseq " << peer_cseq << "), sending RESETSESSION" << dendl; rank.lock.Unlock(); @@ -898,11 +885,28 @@ int Rank::Pipe::accept() } else { // new session dout(10) << "accept new session" << dendl; - break; + goto open; } assert(0); } + replace: + dout(10) << "accept replacing " << existing << dendl; + existing->state = STATE_CLOSED; + existing->cond.Signal(); + existing->unregister_pipe(); + + // steal queue and out_seq + out_seq = existing->out_seq; + if (!existing->sent.empty()) { + out_seq = existing->sent.front()->get_seq()-1; + q.splice(q.begin(), existing->sent); + } + q.splice(q.end(), existing->q); + + existing->lock.Unlock(); + + open: // open register_pipe(); rank.lock.Unlock(); -- 2.39.5