void DispatchQueue::entry()
{
lock.Lock();
- while (!stop) {
- while (!mqueue.empty() && !stop) {
+ while (true) {
+ while (!mqueue.empty()) {
QueueItem qitem = mqueue.dequeue();
if (!qitem.is_code())
remove_arrival(qitem.get_message());
}
} else {
Message *m = qitem.get_message();
- uint64_t msize = m->get_dispatch_throttle_size();
- m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
-
- ldout(cct,1) << "<== " << m->get_source_inst()
- << " " << m->get_seq()
- << " ==== " << *m
- << " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
- << "+" << m->get_data().length()
- << " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc
- << " " << m->get_footer().data_crc << ")"
- << " " << m << " con " << m->get_connection()
- << dendl;
- msgr->ms_deliver_dispatch(m);
-
- msgr->dispatch_throttle_release(msize);
-
- ldout(cct,20) << "done calling dispatch on " << m << dendl;
+ if (stop) {
+ ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl;
+ m->put();
+ } else {
+ uint64_t msize = m->get_dispatch_throttle_size();
+ m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
+
+ ldout(cct,1) << "<== " << m->get_source_inst()
+ << " " << m->get_seq()
+ << " ==== " << *m
+ << " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
+ << "+" << m->get_data().length()
+ << " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc
+ << " " << m->get_footer().data_crc << ")"
+ << " " << m << " con " << m->get_connection()
+ << dendl;
+ msgr->ms_deliver_dispatch(m);
+
+ msgr->dispatch_throttle_release(msize);
+
+ ldout(cct,20) << "done calling dispatch on " << m << dendl;
+ }
}
lock.Lock();
}
- if (!stop)
- cond.Wait(lock); //wait for something to be put on queue
+ if (stop)
+ break;
+
+ // wait for something to be put on queue
+ cond.Wait(lock);
}
lock.Unlock();
}