]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
DispatchQueue: track queued message arrival times and expose oldest
authorSamuel Just <sam.just@inktank.com>
Mon, 22 Apr 2013 21:06:05 +0000 (14:06 -0700)
committerSamuel Just <sam.just@inktank.com>
Wed, 24 Apr 2013 01:27:28 +0000 (18:27 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/msg/DispatchQueue.cc
src/msg/DispatchQueue.h

index 31f37cfd5f6384ac97a8b428a5e9d8708f1bc553..67bb52e6c7b1758785dc2ab97134d7bfe3d5f814 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "-- " << msgr->get_myaddr() << " "
 
+double DispatchQueue::get_max_age(utime_t now) {
+  Mutex::Locker l(lock);
+  if (marrival.empty())
+    return 0;
+  else
+    return (now - marrival.begin()->first);
+}
+
 void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
 {
   Mutex::Locker l(lock);
   ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
+  add_arrival(m);
   if (priority >= CEPH_MSG_PRIO_LOW) {
     mqueue.enqueue_strict(
       id, priority, QueueItem(m));
@@ -46,6 +55,7 @@ void DispatchQueue::local_delivery(Message *m, int priority)
 {
   Mutex::Locker l(lock);
   m->set_connection(msgr->local_connection->get());
+  add_arrival(m);
   if (priority >= CEPH_MSG_PRIO_LOW) {
     mqueue.enqueue_strict(
       0, priority, QueueItem(m));
@@ -71,6 +81,8 @@ void DispatchQueue::entry()
   while (!stop) {
     while (!mqueue.empty() && !stop) {
       QueueItem qitem = mqueue.dequeue();
+      if (!qitem.is_code())
+       remove_arrival(qitem.get_message());
       lock.Unlock();
 
       if (qitem.is_code()) {
@@ -128,6 +140,7 @@ void DispatchQueue::discard_queue(uint64_t id) {
        ++i) {
     assert(!(i->is_code())); // We don't discard id 0, ever!
     Message *m = i->get_message();
+    remove_arrival(m);
     msgr->dispatch_throttle_release(m->get_dispatch_throttle_size());
     m->put();
   }
index 884e0269342628981ba8c6fb64bfcca7d531b307..82d6ba12c8d0bba17572992b70001e1724dbb7ef 100644 (file)
@@ -70,6 +70,25 @@ class DispatchQueue {
   Cond cond;
 
   PrioritizedQueue<QueueItem, uint64_t> mqueue;
+
+  set<pair<double, Message*> > marrival;
+  map<Message *, set<pair<double, Message*> >::iterator> marrival_map;
+  void add_arrival(Message *m) {
+    marrival_map.insert(
+      make_pair(
+       m,
+       marrival.insert(make_pair(m->get_recv_stamp(), m)).first
+       )
+      );
+  }
+  void remove_arrival(Message *m) {
+    map<Message *, set<pair<double, Message*> >::iterator>::iterator i =
+      marrival_map.find(m);
+    assert(i != marrival_map.end());
+    marrival.erase(i->second);
+    marrival_map.erase(i);
+  }
+
   uint64_t next_pipe_id;
     
   enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_NUM_CODES };
@@ -91,6 +110,8 @@ class DispatchQueue {
   bool stop;
   void local_delivery(Message *m, int priority);
 
+  double get_max_age(utime_t now);
+
   int get_queue_len() {
     Mutex::Locker l(lock);
     return mqueue.length();