]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: tolerate incoming seq #'s that skip ahead
authorSage Weil <sage@newdream.net>
Wed, 12 May 2010 04:14:39 +0000 (21:14 -0700)
committerSage Weil <sage@newdream.net>
Wed, 12 May 2010 04:14:39 +0000 (21:14 -0700)
This is necessary because the kclient may pull messages out of the out/sent
queues, and we can't renumber previously sent (and possibly received)
messages without breaking things entirely.

src/msg/SimpleMessenger.cc

index c3d08d59e97dfff9ba712d9478d6ba196c585046..5a9eb02e1181d258ca60be683c51faeee5ca1d7c 100644 (file)
@@ -1503,7 +1503,11 @@ void SimpleMessenger::Pipe::reader()
 
       m->set_connection(connection_state->get());
 
-      // check received seq#
+      // check received seq#.  if it is old, drop the message.  
+      // note that incoming messages may skip ahead.  this is convenient for the client
+      // side queueing because messages can't be renumbered, but the (kernel) client will
+      // occasionally pull a message out of the sent queue to send elsewhere.  in that case
+      // it doesn't matter if we "got" it or not.
       if (m->get_seq() <= in_seq) {
        dout(-10) << "reader got old message "
                  << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
@@ -1511,18 +1515,9 @@ void SimpleMessenger::Pipe::reader()
        m->put();
        continue;
       }
-      in_seq++;
-
-      if (!policy.lossy && in_seq != m->get_seq()) {
-       dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
-               << " for " << *m << " from " << m->get_source() << dendl;
-       derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
-               << " for " << *m << " from " << m->get_source() << dendl;
-       assert(in_seq == m->get_seq()); // for now!
-       m->put();
-       fault(false, true);
-       continue;
-      }
+
+      // note last received message.
+      in_seq = m->get_seq();
 
       cond.Signal();  // wake up writer, to ack this
       pipe_lock.Unlock();