m->set_connection(connection_state->get());
- // check received seq#
+ // check received seq#. if it is old, drop the message.
+ // note that incoming messages may skip ahead. this is convenient for the client
+ // side queueing because messages can't be renumbered, but the (kernel) client will
+ // occasionally pull a message out of the sent queue to send elsewhere. in that case
+ // it doesn't matter if we "got" it or not.
if (m->get_seq() <= in_seq) {
dout(-10) << "reader got old message "
<< m->get_seq() << " <= " << in_seq << " " << m << " " << *m
m->put();
continue;
}
- in_seq++;
-
- if (!policy.lossy && in_seq != m->get_seq()) {
- dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
- << " for " << *m << " from " << m->get_source() << dendl;
- derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
- << " for " << *m << " from " << m->get_source() << dendl;
- assert(in_seq == m->get_seq()); // for now!
- m->put();
- fault(false, true);
- continue;
- }
+
+ // note last received message.
+ in_seq = m->get_seq();
cond.Signal(); // wake up writer, to ack this
pipe_lock.Unlock();