]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: convert recovery to a work queue
authorSage Weil <sage@newdream.net>
Thu, 20 Nov 2008 21:56:36 +0000 (13:56 -0800)
committerSage Weil <sage@newdream.net>
Thu, 20 Nov 2008 22:48:27 +0000 (14:48 -0800)
src/osd/OSD.cc
src/osd/OSD.h

index e8fe2da42fd906e9a04cfd502ac4ea079335079e..cdd7744215596aafae0bd5941bde4a13c0f64593 100644 (file)
@@ -280,10 +280,9 @@ OSD::OSD(int id, Messenger *m, MonMap *mm, const char *dev) :
   map_cache_lock("OSD::map_cache_lock"),
   pg_stat_queue_lock("OSD::pg_stat_queue_lock"),
   tid_lock("OSD::tid_lock"),
-  recovery_lock("OSD::recovery_lock"),
-  recovery_ops_active(0), recovery_stop(false), recovery_pause(false),
+  recovery_ops_active(0),
+  recovery_wq(this),
   remove_list_lock("OSD::remove_list_lock"),
-  recovery_thread(this),
   snap_trim_wq(this),
   scrub_wq(this)
 {
@@ -451,7 +450,7 @@ int OSD::init()
   booting = true;
   do_mon_report();     // start mon report timer
   
-  recovery_thread.create();
+  recovery_wq.start();
   scrub_wq.start();
   snap_trim_wq.start();
 
@@ -488,9 +487,8 @@ int OSD::shutdown()
   delete threadpool;
   threadpool = 0;
 
-  stop_recovery_thread();
-  dout(10) << "recovery thread stopped" << dendl;
-
+  recovery_wq.stop();
+  dout(10) << "recovery wq stopped" << dendl;
   scrub_wq.stop();
   dout(10) << "scrub wq stopped" << dendl;
   snap_trim_wq.stop();
@@ -667,6 +665,7 @@ void OSD::_remove_unlock_pg(PG *pg)
 
   dout(10) << "_remove_unlock_pg " << pgid << dendl;
 
+  recovery_wq.dequeue(pg);
   snap_trim_wq.dequeue(pg);
   scrub_wq.dequeue(pg);
 
@@ -961,6 +960,9 @@ void OSD::heartbeat()
                            messenger->get_myinst());
     return;
   }
+
+  // periodically kick recovery work queue
+  recovery_wq.kick();
   
   // get CPU load avg
   ifstream in("/proc/loadavg");
@@ -1519,7 +1521,7 @@ void OSD::handle_osd_map(MOSDMap *m)
   booting = boot_pending = false;
 
   wait_for_no_ops();
-  pause_recovery_thread();
+  recovery_wq.pause();
   scrub_wq.pause();
   snap_trim_wq.pause();
   map_lock.get_write();
@@ -1714,7 +1716,7 @@ void OSD::handle_osd_map(MOSDMap *m)
   store->sync();
 
   map_lock.put_write();
-  unpause_recovery_thread();
+  recovery_wq.unpause();
   scrub_wq.unpause();
   snap_trim_wq.unpause();
 
@@ -2938,34 +2940,18 @@ void OSD::activate_pg(pg_t pgid, epoch_t epoch)
 }
 
 
-
-void OSD::queue_for_recovery(PG *pg)
+bool OSD::queue_for_recovery(PG *pg)
 {
-  recovery_lock.Lock();
-  if (!pg->recovery_item.get_xlist()) {
-    recovering_pgs.push_back(&pg->recovery_item);
-    pg->get();
-    dout(10) << "queue_for_recovery " << *pg
-            << " -- " << recovering_pgs.size() << " queued" << dendl;
-  } else {
-    dout(10) << "queue_for_recovery " << *pg << " -- already queued" << dendl;
-  }
-  
-  // delay recovery start?
-  if (g_conf.osd_recovery_delay_start > 0) {
-    defer_recovery_until = g_clock.now();
-    defer_recovery_until += g_conf.osd_recovery_delay_start;
-    timer.add_event_at(defer_recovery_until, new C_StartRecovery(this));
-  } else if (_recover_now())
-    recovery_cond.Signal();
-  recovery_lock.Unlock();  
+  bool b = recovery_wq.queue(pg);
+  if (b)
+    dout(10) << "queue_for_recovery queued " << *pg << dendl;
+  else
+    dout(10) << "queue_for_recovery already queued " << *pg << dendl;
+  return b;
 }
 
 bool OSD::_recover_now()
 {
-  if (recovering_pgs.empty())
-    return false;
-
   if (recovery_ops_active >= g_conf.osd_recovery_max_active) {
     dout(15) << "_recover_now max " << g_conf.osd_recovery_max_active << " active" << dendl;
     return false;
@@ -2977,45 +2963,27 @@ bool OSD::_recover_now()
 
   return true;
 }
-void OSD::_do_recovery()
+
+void OSD::do_recovery(PG *pg)
 {
-  assert(recovery_lock.is_locked());
+  pg->lock();
 
   int max = g_conf.osd_recovery_max_active - recovery_ops_active;
-  
-  PG *pg = recovering_pgs.front();
-  pg->get();
-  
   dout(10) << "do_recovery starting " << max
           << " (" << recovery_ops_active
           << "/" << g_conf.osd_recovery_max_active << " active) on "
           << *pg << dendl;
-  
-  recovery_lock.Unlock();
-  
-  pg->lock();
+
   int started = pg->start_recovery_ops(max);
   recovery_ops_active += started;
   pg->recovery_ops_active += started;
   if (started < max)
     pg->recovery_item.remove_myself();
+
   pg->put_unlock();
-  
-  recovery_lock.Lock();
 }
 
-void OSD::recovery_entry()
-{
-  recovery_lock.Lock();
-  dout(10) << "recovery_entry - start" << dendl;
-  while (!recovery_stop) {
-    if (!recovery_pause && _recover_now())
-      _do_recovery();
-    recovery_cond.Wait(recovery_lock);
-  }
-  dout(10) << "recovery_entry - done" << dendl;
-  recovery_lock.Unlock();
-}
 
 
 
@@ -3023,7 +2991,7 @@ void OSD::finish_recovery_op(PG *pg, int count, bool dequeue)
 {
   dout(10) << "finish_recovery_op " << *pg << " count " << count
           << " dequeue=" << dequeue << dendl;
-  recovery_lock.Lock();
+  recovery_wq.lock();
 
   // adjust count
   recovery_ops_active -= count;
@@ -3031,22 +2999,25 @@ void OSD::finish_recovery_op(PG *pg, int count, bool dequeue)
 
   if (dequeue)
     pg->recovery_item.remove_myself();
-  else
-    recovering_pgs.push_front(&pg->recovery_item);  // requeue
+  else {
+    pg->get();
+    recovery_queue.push_front(&pg->recovery_item);  // requeue
+  }
 
-  recovery_cond.Signal();
-  recovery_lock.Unlock();
+  recovery_wq._kick();
+  recovery_wq.unlock();
 }
 
 void OSD::defer_recovery(PG *pg)
 {
   dout(10) << "defer_recovery " << *pg << dendl;
-  recovery_lock.Lock();
 
   // move pg to the end of the queue...
-  recovering_pgs.push_back(&pg->recovery_item);
-
-  recovery_lock.Unlock();
+  recovery_wq.lock();
+  pg->get();
+  recovery_queue.push_back(&pg->recovery_item);
+  recovery_wq._kick();
+  recovery_wq.unlock();
 }
 
 
index 727962ba11469ca3b8662b4a59c91caabc07a3b6..a17f75d2a8a2930e4397492125ca64c39362547b 100644 (file)
@@ -417,48 +417,54 @@ private:
                        int& created);
 
   // -- pg recovery --
-  Mutex recovery_lock;
-  Cond recovery_cond;
-  xlist<PG*> recovering_pgs;
+  xlist<PG*> recovery_queue;
   utime_t defer_recovery_until;
   int recovery_ops_active;
-  bool recovery_stop;
-  bool recovery_pause;
 
-  Mutex remove_list_lock;
-  map<epoch_t, map<int, vector<pg_t> > > remove_list;
+  struct RecoveryWQ : public WorkQueue<PG> {
+    OSD *osd;
+    RecoveryWQ(OSD *o) : WorkQueue<PG>("OSD::RecoveryWQ"), osd(o) {}
+
+    bool _enqueue(PG *pg) {
+      if (!pg->recovery_item.get_xlist()) {
+       pg->get();
+       osd->recovery_queue.push_back(&pg->recovery_item);
+
+       if (g_conf.osd_recovery_delay_start > 0) {
+         osd->defer_recovery_until = g_clock.now();
+         osd->defer_recovery_until += g_conf.osd_recovery_delay_start;
+       }
+       return true;
+      }
+      return false;
+    }
+    void _dequeue(PG *pg) {
+      pg->recovery_item.remove_myself();
+    }
+    PG * _dequeue() {
+      if (osd->recovery_queue.empty())
+       return NULL;
+      
+      if (!osd->_recover_now())
+       return NULL;
+
+      PG *pg = osd->recovery_queue.front();
+      osd->recovery_queue.pop_front();
+      return pg;
+    }
+    void _process(PG *pg) {
+      osd->do_recovery(pg);
+    }
+  } recovery_wq;
 
-  void queue_for_recovery(PG *pg);
+  bool queue_for_recovery(PG *pg);
   void finish_recovery_op(PG *pg, int count, bool more);
   void defer_recovery(PG *pg);
-  void _do_recovery();
-  void recovery_entry();
+  void do_recovery(PG *pg);
   bool _recover_now();
-  void kick_recovery() {
-    recovery_lock.Lock();
-    recovery_cond.Signal();
-    recovery_lock.Unlock();
-  }
-  void stop_recovery_thread() {
-    osd_lock.Unlock();
-    recovery_lock.Lock();
-    recovery_stop = true;
-    recovery_cond.Signal();
-    recovery_lock.Unlock();
-    recovery_thread.join();
-    osd_lock.Lock();
-  }
-  void pause_recovery_thread() {
-    recovery_lock.Lock();
-    recovery_pause = true;
-    recovery_lock.Unlock();
-  }
-  void unpause_recovery_thread() {
-    recovery_lock.Lock();
-    recovery_pause = false;
-    recovery_cond.Signal();
-    recovery_lock.Unlock();
-  }
+
+  Mutex remove_list_lock;
+  map<epoch_t, map<int, vector<pg_t> > > remove_list;
 
   void queue_for_removal(int osd, pg_t pgid) {
     remove_list_lock.Lock();
@@ -466,23 +472,6 @@ private:
     remove_list_lock.Unlock();
   }
 
-  struct RecoveryThread : public Thread {
-    OSD *osd;
-    RecoveryThread(OSD *o) : osd(o) {}
-    void *entry() {
-      osd->recovery_entry();
-      return 0;
-    }
-  } recovery_thread;
-
-  struct C_StartRecovery : public Context {
-    OSD *osd;
-    C_StartRecovery(OSD *o) : osd(o) {}
-    void finish(int r) {
-      osd->kick_recovery();
-    }
-  };
-  
   void activate_pg(pg_t pgid, epoch_t epoch);
 
   class C_Activate : public Context {
@@ -504,8 +493,11 @@ private:
     OSD *osd;
     SnapTrimWQ(OSD *o) : WorkQueue<PG>("OSD::SnapTrimWQ"), osd(o) {}
 
-    void _enqueue(PG *pg) {
+    bool _enqueue(PG *pg) {
+      if (pg->snap_trim_item.is_on_xlist())
+       return false;
       osd->snap_trim_queue.push_back(&pg->snap_trim_item);
+      return true;
     }
     void _dequeue(PG *pg) {
       pg->snap_trim_item.remove_myself();
@@ -530,8 +522,11 @@ private:
     OSD *osd;
     ScrubWQ(OSD *o) : WorkQueue<PG>("OSD::ScrubWQ"), osd(o) {}
 
-    void _enqueue(PG *pg) {
+    bool _enqueue(PG *pg) {
+      if (pg->scrub_item.is_on_xlist())
+       return false;
       osd->scrub_queue.push_back(&pg->scrub_item);
+      return true;
     }
     void _dequeue(PG *pg) {
       pg->scrub_item.remove_myself();