From 55ff866c25377864ad2146e5923d70144d662a86 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 12 Mar 2008 15:29:57 -0700 Subject: [PATCH] msgr: lots of fixes, cleanup. reset detection and races both seem to work --- src/msg/SimpleMessenger.cc | 157 +++++++++++++++++++++++-------------- src/msg/SimpleMessenger.h | 2 + src/osd/OSD.h | 1 + src/start.sh | 2 +- 4 files changed, 102 insertions(+), 60 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index bc179786c0a4e..f767ec42954af 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -807,61 +807,88 @@ int Rank::Pipe::accept() existing->lock.Lock(); if (peer_cseq < existing->connect_seq) { - // old attempt, or we sent READY but they didn't get it. - dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq - << " > " << peer_cseq << ", RETRY" << dendl; - existing->lock.Unlock(); - rank.lock.Unlock(); - char tag = CEPH_MSGR_TAG_RETRY; - if (tcp_write(sd, &tag, 1) < 0) - goto fail; - continue; - } - - if ((peer_cseq == existing->connect_seq && peer_addr < rank.rank_addr) || - (peer_cseq > existing->connect_seq)) { - // connection race, incoming wins; or - // reconnect - dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq - << " <= " << peer_cseq << ", replacing" << dendl; - assert(existing->state == STATE_CONNECTING || - existing->state == STATE_WAIT); - existing->state = STATE_CLOSED; - existing->cond.Signal(); - existing->unregister_pipe(); - - // steal queue and out_seq - out_seq = existing->out_seq; - if (!existing->sent.empty()) { - out_seq = existing->sent.front()->get_seq()-1; - q.splice(q.begin(), existing->sent); + if (peer_cseq == 0) { + dout(10) << "accept peer reset, then tried to connect to us, replacing" << dendl; + existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s + goto replace; + } else { + // old attempt, or we sent READY but they didn't get it. + dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq + << " > " << peer_cseq << ", RETRY" << dendl; + existing->lock.Unlock(); + rank.lock.Unlock(); + char tag = CEPH_MSGR_TAG_RETRY; + if (tcp_write(sd, &tag, 1) < 0) + goto fail; + if (tcp_write(sd, (char*)&connect_seq, sizeof(connect_seq)) < 0) + goto fail; + continue; } - q.splice(q.end(), existing->q); - - existing->lock.Unlock(); - break; } - + if (peer_cseq == existing->connect_seq) { - // connection race, our outgoing wins - dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq - << " == " << peer_cseq << ", sending WAIT" << dendl; - assert(peer_addr > rank.rank_addr); - assert(existing->state == STATE_CONNECTING); - existing->lock.Unlock(); + // connection race + if (peer_addr < rank.rank_addr) { + // incoming wins + dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq + << " == " << peer_cseq << ", replacing my attempt" << dendl; + assert(existing->state == STATE_CONNECTING || + existing->state == STATE_WAIT); + goto replace; + } else { + // our existing outgoing wins + dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq + << " == " << peer_cseq << ", sending WAIT" << dendl; + assert(peer_addr > rank.rank_addr); + assert(existing->state == STATE_CONNECTING); // this will win + existing->lock.Unlock(); + rank.lock.Unlock(); + + char tag = CEPH_MSGR_TAG_WAIT; + if (tcp_write(sd, &tag, 1) < 0) + goto fail; + continue; + } + } + + assert(peer_cseq > existing->connect_seq); + if (existing->connect_seq == 0) { + dout(10) << "accept we reset (peer sent cseq " << peer_cseq + << ", " << existing << ".cseq = " << existing->connect_seq + << "), sending RESETSESSION" << dendl; rank.lock.Unlock(); - - char tag = CEPH_MSGR_TAG_WAIT; + existing->lock.Unlock(); + char tag = CEPH_MSGR_TAG_RESETSESSION; if (tcp_write(sd, &tag, 1) < 0) goto fail; continue; + } else { + // reconnect + dout(10) << "accept peer sent cseq " << peer_cseq << " > " << existing->connect_seq << dendl; + goto replace; } - assert(0); + + replace: + dout(10) << "accept replacing " << existing << dendl; + existing->state = STATE_CLOSED; + existing->cond.Signal(); + existing->unregister_pipe(); + + // steal queue and out_seq + out_seq = existing->out_seq; + if (!existing->sent.empty()) { + out_seq = existing->sent.front()->get_seq()-1; + q.splice(q.begin(), existing->sent); + } + q.splice(q.end(), existing->q); + + existing->lock.Unlock(); + break; } if (peer_cseq > 0) { - // we reset, and are opening a new session + // we reset, and they are opening a new session dout(10) << "accept we reset (peer sent cseq " << peer_cseq << "), sending RESETSESSION" << dendl; rank.lock.Unlock(); char tag = CEPH_MSGR_TAG_RESETSESSION; @@ -873,10 +900,10 @@ int Rank::Pipe::accept() dout(10) << "accept new session" << dendl; break; } - assert(0); + assert(0); } - // okay! + // open register_pipe(); rank.lock.Unlock(); @@ -964,7 +991,7 @@ int Rank::Pipe::connect() dout(2) << "connect couldn't read peer addr, " << strerror(errno) << dendl; goto fail; } - dout(20) << "connect read peer addr " << paddr << dendl; + dout(20) << "connect read peer addr " << paddr << " on socket " << newsd << dendl; if (!peer_addr.is_local_to(paddr)) { dout(0) << "connect peer identifies itself as " << paddr << "... wrong node!" << dendl; @@ -1001,15 +1028,8 @@ int Rank::Pipe::connect() } dout(0) << "connect got RESETSESSION" << dendl; - report_failures(); - for (unsigned i=0; iget_dispatcher()) - rank.local[i]->queue_remote_reset(peer_addr, last_dest_name); - // renumber outgoing seqs - out_seq = 0; - for (list::iterator p = q.begin(); p != q.end(); p++) - (*p)->set_seq(++out_seq); - in_seq = 0; + was_session_reset(); + continue; } if (tag == CEPH_MSGR_TAG_RETRY) { int rc = tcp_read(newsd, (char*)&cseq, sizeof(cseq)); @@ -1121,6 +1141,10 @@ void Rank::Pipe::fault(bool onconnect) dout(10) << "fault already closed" << dendl; return; } + + ::close(sd); + sd = -1; + if (q.empty()) { if (state == STATE_CLOSING || onconnect) { dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl; @@ -1129,8 +1153,6 @@ void Rank::Pipe::fault(bool onconnect) dout(0) << "fault nothing to send, going to standby" << dendl; state = STATE_STANDBY; } - ::close(sd); - sd = -1; return; } @@ -1177,6 +1199,23 @@ void Rank::Pipe::fail() rank.local[i]->queue_reset(peer_addr, last_dest_name); } +void Rank::Pipe::was_session_reset() +{ + dout(10) << "was_reset_session" << dendl; + report_failures(); + for (unsigned i=0; iget_dispatcher()) + rank.local[i]->queue_remote_reset(peer_addr, last_dest_name); + + // renumber outgoing seqs + out_seq = 0; + for (list::iterator p = q.begin(); p != q.end(); p++) + (*p)->set_seq(++out_seq); + + in_seq = 0; + connect_seq = 0; +} + void Rank::Pipe::report_failures() { // report failures @@ -1397,7 +1436,7 @@ void Rank::Pipe::writer() { lock.Lock(); - while (state != STATE_CLOSED && state != STATE_WAIT) { + while (state != STATE_CLOSED) { // && state != STATE_WAIT) { // standby? if (!q.empty() && state == STATE_STANDBY) state = STATE_CONNECTING; @@ -1419,7 +1458,7 @@ void Rank::Pipe::writer() continue; } - if (state != STATE_CONNECTING && + if (state != STATE_CONNECTING && state != STATE_WAIT && (!q.empty() || in_seq > in_seq_acked)) { // send ack? diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 7c866fcbed016..06eb75f915a31 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -130,6 +130,8 @@ private: void fault(bool silent=false); void fail(); + void was_session_reset(); + void report_failures(); // threads diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 295fe8d7d8f0f..5cd010edcf45b 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -377,6 +377,7 @@ private: void handle_sub_op_reply(class MOSDSubOpReply *m); void force_remount(); + }; #endif diff --git a/src/start.sh b/src/start.sh index 4466874441d53..3e0b91574804c 100755 --- a/src/start.sh +++ b/src/start.sh @@ -29,7 +29,7 @@ $CEPH_BIN/mkmonfs --clobber mondata/mon0 --mon 0 --monmap .ceph_monmap ARGS="-d --bind $IP -o out --debug_ms 1" # start monitor -$CEPH_BIN/cmon $ARGS mondata/mon0 --debug_mon 10 --debug_ms 1 +$CEPH_BIN/cmon $ARGS mondata/mon0 --debug_mon 10 # build and inject an initial osd map $CEPH_BIN/osdmaptool --clobber --createsimple .ceph_monmap 4 --print .ceph_osdmap -- 2.39.5