From 07f847c17a26e71751ffa3919ba3fb093d1f0df1 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 18 Nov 2008 15:52:54 -0800 Subject: [PATCH] msgr: fix reconnect after error Items in sent queue weren't being moved back to out queue, and in_seq/out_seq weren't being set properly after an incoming connection replaced an existing connection. --- src/msg/SimpleMessenger.cc | 37 +++++++++++++++++++++++++++---------- src/msg/SimpleMessenger.h | 2 ++ 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 60599782b0fbb..7c48bb61b2d8b 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -1013,15 +1013,14 @@ int Rank::Pipe::accept() existing->unregister_pipe(); // steal queue and out_seq + existing->requeue_sent(); out_seq = existing->out_seq; - if (!existing->sent.empty()) { - out_seq = existing->sent.front()->get_seq()-1; - q[CEPH_MSG_PRIO_HIGHEST].splice(q[CEPH_MSG_PRIO_HIGHEST].begin(), existing->sent); - } + in_seq = existing->in_seq; + dout(10) << "accept out_seq " << out_seq << " in_seq " << in_seq << dendl; for (map >::iterator p = existing->q.begin(); p != existing->q.end(); p++) - q[p->first].splice(q[p->first].end(), p->second); + q[p->first].splice(q[p->first].begin(), p->second); existing->lock.Unlock(); @@ -1308,6 +1307,24 @@ void Rank::Pipe::unregister_pipe() } } + +void Rank::Pipe::requeue_sent() +{ + if (sent.empty()) + return; + + list& rq = q[CEPH_MSG_PRIO_HIGHEST]; + while (!sent.empty()) { + Message *m = sent.back(); + sent.pop_back(); + dout(10) << "requeue_sent " << *m << " for resend seq " << out_seq + << " (" << m->get_seq() << ")" << dendl; + rq.push_front(m); + out_seq--; + } +} + + void Rank::Pipe::fault(bool onconnect, bool onread) { assert(lock.is_locked()); @@ -1339,6 +1356,9 @@ void Rank::Pipe::fault(bool onconnect, bool onread) return; } + // requeue sent items + requeue_sent(); + if (q.empty()) { if (state == STATE_CLOSING || onconnect) { dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl; @@ -1696,6 +1716,7 @@ void Rank::Pipe::writer() Message *m = _get_next_outgoing(); if (m) { m->set_seq(++out_seq); + sent.push_back(m); // move to sent list lock.Unlock(); dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl; @@ -1705,14 +1726,10 @@ void Rank::Pipe::writer() m->encode_payload(); m->calc_front_crc(); - lock.Lock(); - sent.push_back(m); // move to sent list - lock.Unlock(); - dout(20) << "writer sending " << m->get_seq() << " " << m << dendl; int rc = write_message(m); + lock.Lock(); - if (rc < 0) { derr(1) << "writer error sending " << m << " to " << m->get_header().dst << ", " << errno << ": " << strerror(errno) << dendl; diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 512130ad099e5..27374355dc91d 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -245,6 +245,8 @@ private: return m; } + void requeue_sent(); + void force_close() { if (sd >= 0) ::close(sd); } -- 2.39.5