]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: do all recovery operations in dedicated recovery thread
authorSage Weil <sage@newdream.net>
Thu, 30 Oct 2008 23:21:33 +0000 (16:21 -0700)
committerSage Weil <sage@newdream.net>
Thu, 30 Oct 2008 23:21:53 +0000 (16:21 -0700)
src/osd/OSD.cc
src/osd/OSD.h

index 113141b3919aba5b2e924cd2158644741e9eb0af..56305b113179067931ac6f4f00e1642d05d99435 100644 (file)
@@ -260,7 +260,8 @@ OSD::OSD(int id, Messenger *m, MonMap *mm, const char *dev) :
   read_latency_calc(g_conf.osd_max_opq<1 ? 1:g_conf.osd_max_opq),
   qlen_calc(3),
   iat_averager(g_conf.osd_flash_crowd_iat_alpha),
-  snap_trimmer_thread(this)
+  snap_trimmer_thread(this),
+  recovery_ops_active(0), recovery_stop(false), recovery_thread(this)
 {
   messenger = m;
   monmap = mm;
@@ -412,6 +413,8 @@ int OSD::init()
   booting = true;
   do_mon_report();     // start mon report timer
   
+  recovery_thread.create();
+
   // start the heartbeat
   timer.add_event_after(g_conf.osd_heartbeat_interval, new C_Heartbeat(this));
 
@@ -434,9 +437,13 @@ int OSD::shutdown()
   timer.cancel_all();
   timer.join();
 
-  // flush data to disk
   osd_lock.Unlock();
+
+  // flush data to disk
   store->sync();
+
+  stop_recovery_thread();
+
   osd_lock.Lock();
 
   // finish ops
@@ -830,7 +837,7 @@ void OSD::_refresh_my_stat(utime_t now)
     logger->fset("rdlatm", my_stat.read_latency_mine);
     logger->fset("fshdin", my_stat.frac_rd_ops_shed_in);
     logger->fset("fshdout", my_stat.frac_rd_ops_shed_out);
-    dout(12) << "_refresh_my_stat " << my_stat << dendl;
+    dout(30) << "_refresh_my_stat " << my_stat << dendl;
 
     stat_rd_ops = 0;
     stat_rd_ops_shed_in = 0;
@@ -1029,7 +1036,7 @@ void OSD::do_mon_report()
   }
   pg_stat_queue_lock.Lock();
   if (!pg_stat_pending.empty() || osd_stat_pending) {
-    dout(10) << "requeueing pg_stat_pending" << dendl;
+    dout(30) << "requeueing pg_stat_pending" << dendl;
     retry = true;
     osd_stat_updated = osd_stat_updated || osd_stat_pending;
     osd_stat_pending = false;
@@ -1117,7 +1124,7 @@ void OSD::send_pg_stats()
 {
   assert(osd_lock.is_locked());
 
-  dout(10) << "send_pg_stats" << dendl;
+  dout(20) << "send_pg_stats" << dendl;
 
   // grab queue
   assert(pg_stat_pending.empty());
@@ -2833,60 +2840,70 @@ void OSD::queue_for_recovery(PG *pg)
   if (g_conf.osd_recovery_delay_start > 0) {
     defer_recovery_until = g_clock.now();
     defer_recovery_until += g_conf.osd_recovery_delay_start;
-  }
-
-  recovery_lock.Unlock();  
-
-  if (defer_recovery_until == utime_t())
-    maybe_start_recovery();
-  else
     timer.add_event_at(defer_recovery_until, new C_StartRecovery(this));
+  } else if (_recover_now())
+    recovery_cond.Signal();
+  recovery_lock.Unlock();  
 }
 
-void OSD::maybe_start_recovery()
+bool OSD::_recover_now()
 {
-  if (g_clock.now() > defer_recovery_until &&
-      recovery_ops_active == 0) {
-    dout(10) << "maybe_start_recovery starting" << dendl;
-    do_recovery();
+  if (recovering_pgs.empty())
+    return false;
+
+  if (recovery_ops_active >= g_conf.osd_recovery_max_active) {
+    dout(15) << "_recover_now max " << g_conf.osd_recovery_max_active << " active" << dendl;
+    return false;
+  }
+  if (g_clock.now() >= defer_recovery_until) {
+    dout(15) << "_recover_now defer until " << defer_recovery_until << dendl;
+    return false;
   }
-}
 
-void OSD::do_recovery()
+  return true;
+}
+void OSD::_do_recovery()
 {
-  while (1) {
-    recovery_lock.Lock();
-    if (recovering_pgs.empty()) {
-      dout(10) << "do_recovery -- no recoverying pgs, nothing to do" << dendl;
-      recovery_lock.Unlock();
-      return;
-    }
-    if (recovery_ops_active >= g_conf.osd_recovery_max_active) {
-      recovery_lock.Unlock();
-      return;
-    }
-    int max = g_conf.osd_recovery_max_active - recovery_ops_active;
-    
-    PG *pg = recovering_pgs.front();
-    pg->get();
+  assert(recovery_lock.is_locked());
 
-    dout(10) << "do_recovery starting " << max
-            << " (" << recovery_ops_active
-            << "/" << g_conf.osd_recovery_max_active << " active) on "
-            << *pg << dendl;
+  int max = g_conf.osd_recovery_max_active - recovery_ops_active;
+  
+  PG *pg = recovering_pgs.front();
+  pg->get();
+  
+  dout(10) << "do_recovery starting " << max
+          << " (" << recovery_ops_active
+          << "/" << g_conf.osd_recovery_max_active << " active) on "
+          << *pg << dendl;
+  
+  recovery_lock.Unlock();
+  
+  pg->lock();
+  int started = pg->start_recovery_ops(max);
+  recovery_ops_active += started;
+  pg->recovery_ops_active += started;
+  if (started < max)
+    pg->recovery_item.remove_myself();
+  pg->put_unlock();
+  
+  recovery_lock.Lock();
+}
 
-    recovery_lock.Unlock();
-    
-    pg->lock();
-    int started = pg->start_recovery_ops(max);
-    recovery_ops_active += started;
-    pg->recovery_ops_active += started;
-    if (started < max)
-      pg->recovery_item.remove_myself();
-    pg->put_unlock();
+void OSD::recovery_entry()
+{
+  recovery_lock.Lock();
+  dout(10) << "recovery_entry - start" << dendl;
+  while (!recovery_stop) {
+    while (_recover_now())
+      _do_recovery();
+    recovery_cond.Wait(recovery_lock);
   }
+  dout(10) << "recovery_entry - done" << dendl;
+  recovery_lock.Unlock();
 }
 
+
+
 void OSD::finish_recovery_op(PG *pg, int count, bool dequeue)
 {
   dout(10) << "finish_recovery_op " << *pg << " count " << count
@@ -2902,11 +2919,8 @@ void OSD::finish_recovery_op(PG *pg, int count, bool dequeue)
   else
     recovering_pgs.push_front(&pg->recovery_item);  // requeue
 
+  recovery_cond.Signal();
   recovery_lock.Unlock();
-
-  // continue recovery?
-  if (count && g_clock.now() > defer_recovery_until)
-    do_recovery();
 }
 
 void OSD::defer_recovery(PG *pg)
index 561984cd3d932e3d9db179bf5bf479734b2c9c3d..01d10d4e3e6d8d7301ce7fba235b6081e7ac4ef3 100644 (file)
@@ -425,18 +425,33 @@ private:
 
   // -- pg recovery --
   Mutex recovery_lock;
+  Cond recovery_cond;
   xlist<PG*> recovering_pgs;
   utime_t defer_recovery_until;
   int recovery_ops_active;
+  bool recovery_stop;
 
   Mutex remove_list_lock;
   map<epoch_t, map<int, vector<pg_t> > > remove_list;
 
   void queue_for_recovery(PG *pg);
-  void maybe_start_recovery();
-  void do_recovery();
   void finish_recovery_op(PG *pg, int count, bool more);
   void defer_recovery(PG *pg);
+  void _do_recovery();
+  void recovery_entry();
+  bool _recover_now();
+  void kick_recovery() {
+    recovery_lock.Lock();
+    recovery_cond.Signal();
+    recovery_lock.Unlock();
+  }
+  void stop_recovery_thread() {
+    recovery_lock.Lock();
+    recovery_stop = true;
+    recovery_cond.Signal();
+    recovery_lock.Unlock();
+    recovery_thread.join();
+  }
 
   void queue_for_removal(int osd, pg_t pgid) {
     remove_list_lock.Lock();
@@ -444,11 +459,20 @@ private:
     remove_list_lock.Unlock();
   }
 
+  struct RecoveryThread : public Thread {
+    OSD *osd;
+    RecoveryThread(OSD *o) : osd(o) {}
+    void *entry() {
+      osd->recovery_entry();
+      return 0;
+    }
+  } recovery_thread;
+
   struct C_StartRecovery : public Context {
     OSD *osd;
     C_StartRecovery(OSD *o) : osd(o) {}
     void finish(int r) {
-      osd->maybe_start_recovery();
+      osd->kick_recovery();
     }
   };