return (now - marrival.begin()->first);
}
+uint64_t DispatchQueue::pre_dispatch(Message *m)
+{
+ ldout(cct,1) << "<== " << m->get_source_inst()
+ << " " << m->get_seq()
+ << " ==== " << *m
+ << " ==== " << m->get_payload().length()
+ << "+" << m->get_middle().length()
+ << "+" << m->get_data().length()
+ << " (" << m->get_footer().front_crc << " "
+ << m->get_footer().middle_crc
+ << " " << m->get_footer().data_crc << ")"
+ << " " << m << " con " << m->get_connection()
+ << dendl;
+ uint64_t msize = m->get_dispatch_throttle_size();
+ m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
+ return msize;
+}
+
+void DispatchQueue::post_dispatch(Message *m, uint64_t msize)
+{
+ msgr->dispatch_throttle_release(msize);
+ ldout(cct,20) << "done calling dispatch on " << m << dendl;
+}
+
void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
{
Mutex::Locker l(lock);
ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl;
m->put();
} else {
- uint64_t msize = m->get_dispatch_throttle_size();
- m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
-
- ldout(cct,1) << "<== " << m->get_source_inst()
- << " " << m->get_seq()
- << " ==== " << *m
- << " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
- << "+" << m->get_data().length()
- << " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc
- << " " << m->get_footer().data_crc << ")"
- << " " << m << " con " << m->get_connection()
- << dendl;
+ uint64_t msize = pre_dispatch(m);
msgr->ms_deliver_dispatch(m);
-
- msgr->dispatch_throttle_release(msize);
-
- ldout(cct,20) << "done calling dispatch on " << m << dendl;
+ post_dispatch(m, msize);
}
}