]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: requeue instead of draining op_queue on map update
authorSage Weil <sage@newdream.net>
Mon, 26 Jan 2009 23:24:04 +0000 (15:24 -0800)
committerSage Weil <sage@newdream.net>
Tue, 27 Jan 2009 00:21:51 +0000 (16:21 -0800)
Also clean up waiter helpers.

src/osd/OSD.cc
src/osd/OSD.h

index 628deaa53d9181bd19118045e7ad0e92c41a98eb..50a818b63c22162a8097a9242942b68f2836083b 100644 (file)
@@ -1173,26 +1173,7 @@ void OSD::tick()
 
   timer.add_event_after(1.0, new C_Tick(this));
 
-
-  // finishers?
-  finished_lock.Lock();
-  if (finished.empty()) {
-    finished_lock.Unlock();
-  } else {
-    list<Message*> waiting;
-    waiting.splice(waiting.begin(), finished);
-
-    finished_lock.Unlock();
-    osd_lock.Unlock();
-    
-    for (list<Message*>::iterator it = waiting.begin();
-         it != waiting.end();
-         it++) {
-      dispatch(*it);
-    }
-
-    osd_lock.Lock();
-  }
+  do_waiters();
 }
 
 // =========================================
@@ -1495,10 +1476,41 @@ bool OSD::dispatch_impl(Message *m)
     return true;
   }
 
-
   // lock!
   osd_lock.Lock();
-  dout(20) << "dispatch " << m << dendl;
+  _dispatch(m);
+  do_waiters();
+  osd_lock.Unlock();
+  return true;
+}
+
+void OSD::do_waiters()
+{
+  assert(osd_lock.is_locked());
+  
+  finished_lock.Lock();
+  if (finished.empty()) {
+    finished_lock.Unlock();
+  } else {
+    list<Message*> waiting;
+    waiting.splice(waiting.begin(), finished);
+
+    finished_lock.Unlock();
+    
+    dout(2) << "do_waiters -- start" << dendl;
+    for (list<Message*>::iterator it = waiting.begin();
+         it != waiting.end();
+         it++)
+      _dispatch(*it);
+    dout(2) << "do_waiters -- finish" << dendl;
+  }
+}
+
+
+void OSD::_dispatch(Message *m)
+{
+  assert(osd_lock.is_locked());
+  dout(20) << "_dispatch " << m << " " << *m << dendl;
 
   switch (m->get_type()) {
 
@@ -1587,33 +1599,9 @@ bool OSD::dispatch_impl(Message *m)
         handle_sub_op_reply((MOSDSubOpReply*)m);
         break;
         
-        
-      default:
-        return false;
       }
     }
   }
-
-  // finishers?
-  finished_lock.Lock();
-  if (!finished.empty()) {
-    list<Message*> waiting;
-    waiting.splice(waiting.begin(), finished);
-
-    finished_lock.Unlock();
-    osd_lock.Unlock();
-    
-    while (!waiting.empty()) {
-      dout(20) << "doing finished " << waiting.front() << dendl;
-      dispatch(waiting.front());
-      waiting.pop_front();
-    }
-    return true;
-  }
-  
-  finished_lock.Unlock();
-  osd_lock.Unlock();
-  return true;
 }
 
 
@@ -1732,7 +1720,23 @@ void OSD::handle_osd_map(MOSDMap *m)
 
   state = STATE_ACTIVE;
 
-  wait_for_no_ops();
+  // pause, requeue op queue
+  //wait_for_no_ops();
+  op_tp.pause();
+  op_wq.lock();
+  list<Message*> rq;
+  while (!op_queue.empty()) {
+    PG *pg = op_queue.back();
+    op_queue.pop_back();
+    Message *m = pg->op_queue.back();
+    pg->op_queue.pop_back();
+    pg->put();
+    dout(15) << " will requeue " << *m << dendl;
+    rq.push_front(m);
+  }
+  op_wq.unlock();
+  push_waiters(rq);
+
   recovery_tp.pause();
   disk_tp.pause_new();   // _process() may be waiting for a replica message
   map_lock.get_write();
@@ -1928,6 +1932,7 @@ void OSD::handle_osd_map(MOSDMap *m)
 
   map_lock.put_write();
 
+  op_tp.unpause();
   recovery_tp.unpause();
   disk_tp.unpause();
 
index e84a56cc878c1a7a05fd90944e5c2d065b764ff0..6054f9edadd41fd3c80f9e7ef8f90fc22d0e9b01 100644 (file)
@@ -75,6 +75,8 @@ protected:
 
   void tick();
 
+  void _dispatch(Message *m);
+
 public:
   int get_nodeid() { return whoami; }
   
@@ -266,6 +268,12 @@ private:
     finished.splice(finished.end(), ls);
     finished_lock.Unlock();
   }
+  void push_waiters(list<class Message*>& ls) {
+    finished_lock.Lock();
+    finished.splice(finished.begin(), ls);
+    finished_lock.Unlock();
+  }
+  void do_waiters();
   
   // -- op queue --
   deque<PG*> op_queue;