#undef dout_prefix
#define dout_prefix *_dout << "-- " << msgr->get_myaddr() << " "
+double DispatchQueue::get_max_age(utime_t now) {
+ Mutex::Locker l(lock);
+ if (marrival.empty())
+ return 0;
+ else
+ return (now - marrival.begin()->first);
+}
+
void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
{
Mutex::Locker l(lock);
ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
+ add_arrival(m);
if (priority >= CEPH_MSG_PRIO_LOW) {
mqueue.enqueue_strict(
id, priority, QueueItem(m));
{
Mutex::Locker l(lock);
m->set_connection(msgr->local_connection->get());
+ add_arrival(m);
if (priority >= CEPH_MSG_PRIO_LOW) {
mqueue.enqueue_strict(
0, priority, QueueItem(m));
while (!stop) {
while (!mqueue.empty() && !stop) {
QueueItem qitem = mqueue.dequeue();
+ if (!qitem.is_code())
+ remove_arrival(qitem.get_message());
lock.Unlock();
if (qitem.is_code()) {
++i) {
assert(!(i->is_code())); // We don't discard id 0, ever!
Message *m = i->get_message();
+ remove_arrival(m);
msgr->dispatch_throttle_release(m->get_dispatch_throttle_size());
m->put();
}
Cond cond;
PrioritizedQueue<QueueItem, uint64_t> mqueue;
+
+ set<pair<double, Message*> > marrival;
+ map<Message *, set<pair<double, Message*> >::iterator> marrival_map;
+ void add_arrival(Message *m) {
+ marrival_map.insert(
+ make_pair(
+ m,
+ marrival.insert(make_pair(m->get_recv_stamp(), m)).first
+ )
+ );
+ }
+ void remove_arrival(Message *m) {
+ map<Message *, set<pair<double, Message*> >::iterator>::iterator i =
+ marrival_map.find(m);
+ assert(i != marrival_map.end());
+ marrival.erase(i->second);
+ marrival_map.erase(i);
+ }
+
uint64_t next_pipe_id;
enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_NUM_CODES };
bool stop;
void local_delivery(Message *m, int priority);
+ double get_max_age(utime_t now);
+
int get_queue_len() {
Mutex::Locker l(lock);
return mqueue.length();