]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: tolerate incoming seq #'s that skip ahead
authorSage Weil <sage@newdream.net>
Wed, 12 May 2010 04:29:47 +0000 (21:29 -0700)
committerSage Weil <sage@newdream.net>
Wed, 12 May 2010 04:29:47 +0000 (21:29 -0700)
This is necessary because the kclient may pull messages out of the out/sent
queues, and we can't renumber previously sent (and possibly received)
messages without breaking things entirely.

src/msg/SimpleMessenger.cc

index 18ed42a0c8341092681406aac6899eb8cd2903d4..c0caff0067b97ba4bbaf0517f57e294cf726ba7b 100644 (file)
@@ -1485,7 +1485,11 @@ void SimpleMessenger::Pipe::reader()
 
       m->set_connection(connection_state->get());
 
-      // check received seq#
+      // check received seq#.  if it is old, drop the message.  
+      // note that incoming messages may skip ahead.  this is convenient for the client
+      // side queueing because messages can't be renumbered, but the (kernel) client will
+      // occasionally pull a message out of the sent queue to send elsewhere.  in that case
+      // it doesn't matter if we "got" it or not.
       if (m->get_seq() <= in_seq) {
        dout(-10) << "reader got old message "
                  << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
@@ -1493,18 +1497,9 @@ void SimpleMessenger::Pipe::reader()
        delete m;
        continue;
       }
-      in_seq++;
-
-      if (!policy.lossy && in_seq != m->get_seq()) {
-       dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
-               << " for " << *m << " from " << m->get_source() << dendl;
-       derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
-               << " for " << *m << " from " << m->get_source() << dendl;
-       assert(in_seq == m->get_seq()); // for now!
-       delete m;
-       fault(false, true);
-       continue;
-      }
+
+      // note last received message.
+      in_seq = m->get_seq();
 
       cond.Signal();  // wake up writer, to ack this
       pipe_lock.Unlock();