]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
heartbeatmap: add suicide grace
authorSage Weil <sage@newdream.net>
Mon, 29 Aug 2011 17:04:30 +0000 (10:04 -0700)
committerSage Weil <sage@newdream.net>
Mon, 29 Aug 2011 18:46:15 +0000 (11:46 -0700)
Add a second suicide grace period that will make us kill ourselves if
we are sufficiently catatonic.

Signed-off-by: Sage Weil <sage@newdream.net>
13 files changed:
src/common/HeartbeatMap.cc
src/common/HeartbeatMap.h
src/common/WorkQueue.cc
src/common/WorkQueue.h
src/common/config.cc
src/common/config.h
src/os/FileStore.h
src/osd/OSD.h
src/rados_export.cc
src/rados_import.cc
src/rados_sync.cc
src/rados_sync.h
src/rgw/rgw_main.cc

index 99970e4237ebe8d33d38fb2edfd8ea6c5736bde5..a571859f620367f7dc0d650c8ad4c794dd208013 100644 (file)
@@ -60,29 +60,48 @@ void HeartbeatMap::remove_worker(heartbeat_handle_d *h)
   delete h;
 }
 
-void HeartbeatMap::reset_timeout(heartbeat_handle_d *h, time_t grace)
+bool HeartbeatMap::_check(heartbeat_handle_d *h, const char *who, time_t now)
 {
-  ldout(m_cct, 20) << "reset_timeout '" << h->name << "' grace " << grace << dendl;
-  time_t now = time(NULL);
-  time_t was = h->timeout.read();
+  bool healthy = true;
+  time_t was;
+
+  was = h->timeout.read();
   if (was && was < now) {
-    ldout(m_cct, 1) << "reset_timeout '" << h->name << "'"
+    ldout(m_cct, 1) << who << " '" << h->name << "'"
                    << " had timed out after " << h->grace << dendl;
+    healthy = false;
+  }
+  was = h->suicide_timeout.read();
+  if (was && was < now) {
+    ldout(m_cct, 1) << who << " '" << h->name << "'"
+                   << " had suicide timed out after " << h->suicide_grace << dendl;
+    assert(0 == "hit suicide timeout");
   }
+  return healthy;
+}
+
+void HeartbeatMap::reset_timeout(heartbeat_handle_d *h, time_t grace, time_t suicide_grace)
+{
+  ldout(m_cct, 20) << "reset_timeout '" << h->name << "' grace " << grace
+                  << " suicide " << suicide_grace << dendl;
+  time_t now = time(NULL);
+  _check(h, "reset_timeout", now);
+
   h->timeout.set(now + grace);
   h->grace = grace;
+
+  if (suicide_grace)
+    h->suicide_timeout.set(now + suicide_grace);
+  h->suicide_grace = suicide_grace;
 }
 
 void HeartbeatMap::clear_timeout(heartbeat_handle_d *h)
 {
   ldout(m_cct, 20) << "clear_timeout '" << h->name << "'" << dendl;
   time_t now = time(NULL);
-  time_t was = h->timeout.read();
-  if (was && was < now) {
-    ldout(m_cct, 1) << "clear_timeout '" << h->name << "'"
-                   << " had timed out after " << h->grace << dendl;
-  }
+  _check(h, "clear_timeout", now);
   h->timeout.set(0);
+  h->suicide_timeout.set(0);
 }
 
 bool HeartbeatMap::is_healthy()
@@ -94,10 +113,7 @@ bool HeartbeatMap::is_healthy()
        p != m_workers.end();
        ++p) {
     heartbeat_handle_d *h = *p;
-    time_t timeout = h->timeout.read();
-    if (timeout && timeout < now) {
-      ldout(m_cct, 1) << "is_healthy '" << h->name << "'" 
-                     << " timed out after " << h->grace << dendl;
+    if (!_check(h, "is_healthy", now)) {
       healthy = false;
     }
   }
index 2cdadd8a503ec751a4eb2f36dadacc17d2aee26d..e6d736f61a1945dada11ffbc99a2d391c4176489 100644 (file)
@@ -41,12 +41,12 @@ namespace ceph {
 
 struct heartbeat_handle_d {
   std::string name;
-  atomic_t timeout;
-  time_t grace;
+  atomic_t timeout, suicide_timeout;
+  time_t grace, suicide_grace;
   std::list<heartbeat_handle_d*>::iterator list_item;
 
   heartbeat_handle_d(const std::string& n)
-    : name(n), grace(0)
+    : name(n), grace(0), suicide_grace(0)
   { }
 };
 
@@ -57,7 +57,7 @@ class HeartbeatMap {
   void remove_worker(heartbeat_handle_d *h);
 
   // reset the timeout so that it expects another touch within grace amount of time
-  void reset_timeout(heartbeat_handle_d *h, time_t grace);
+  void reset_timeout(heartbeat_handle_d *h, time_t grace, time_t suicide_grace);
   // clear the timeout so that it's not checked on
   void clear_timeout(heartbeat_handle_d *h);
 
@@ -74,6 +74,8 @@ class HeartbeatMap {
   CephContext *m_cct;
   RWLock m_rwlock;
   std::list<heartbeat_handle_d*> m_workers;
+
+  bool _check(heartbeat_handle_d *h, const char *who, time_t now);
 };
 
 }
index 0a24b5aff9745ea44b9ac634aef8ae55a2547065..c2de9f01e8888f3a8f5dd686d885e6e744da1789 100644 (file)
@@ -50,7 +50,7 @@ void ThreadPool::worker()
          processing++;
          ldout(cct,12) << "worker wq " << wq->name << " start processing " << item << dendl;
          _lock.Unlock();
-         cct->get_heartbeat_map()->reset_timeout(hb, wq->timeout_interval);
+         cct->get_heartbeat_map()->reset_timeout(hb, wq->timeout_interval, wq->suicide_interval);
          wq->_void_process(item);
          _lock.Lock();
          wq->_void_process_finish(item);
@@ -67,7 +67,7 @@ void ThreadPool::worker()
     }
 
     ldout(cct,15) << "worker waiting" << dendl;
-    cct->get_heartbeat_map()->reset_timeout(hb, 4);
+    cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
     _cond.WaitInterval(cct, _lock, utime_t(2, 0));
   }
   ldout(cct,1) << "worker finish" << dendl;
index cee53d17ef02d4b4d078e9a637acafb0fb169f8b..788af067ec543ec9353d12a4c5640abcee8b4273 100644 (file)
@@ -32,9 +32,9 @@ class ThreadPool {
 
   struct WorkQueue_ {
     string name;
-    time_t timeout_interval;
-    WorkQueue_(string n, time_t ti)
-      : name(n), timeout_interval(ti)
+    time_t timeout_interval, suicide_interval;
+    WorkQueue_(string n, time_t ti, time_t sti)
+      : name(n), timeout_interval(ti), suicide_interval(sti)
     { }
     virtual ~WorkQueue_() {}
     virtual void _clear() = 0;
@@ -66,7 +66,7 @@ public:
     }
 
   public:
-    WorkQueue(string n, time_t ti, ThreadPool* p) : WorkQueue_(n, ti), pool(p) {
+    WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) : WorkQueue_(n, ti, sti), pool(p) {
       pool->add_work_queue(this);
     }
     ~WorkQueue() {
index 7c2c20896e45b50e2fd915c21ec9510bcf59f122..5612417937f8576d3c4adf56462415f712b63545 100644 (file)
@@ -428,6 +428,7 @@ struct config_option config_optionsp[] = {
   OPTION(rgw_cache_lru_size, OPT_INT, 10000),   // num of entries in rgw cache
   OPTION(rgw_socket_path, OPT_STR, NULL),   // path to unix domain socket, if not specified, rgw will not run as external fcgi
   OPTION(rgw_op_thread_timeout, OPT_INT, 10*60),
+  OPTION(rgw_op_thread_suicide_timeout, OPT_INT, 60*60),
 
   // see config.h
   OPTION(internal_safe_to_start_threads, OPT_BOOL, false),
index 5481f4b730a369adc27bde22fbd998e4baf31d11..97b2c291666fc44780345c2f83774728d6aa4cef 100644 (file)
@@ -562,6 +562,7 @@ public:
   int   rgw_cache_lru_size;
   string rgw_socket_path;
   int rgw_op_thread_timeout;
+  int rgw_op_thread_suicide_timeout;
 
   // This will be set to true when it is safe to start threads.
   // Once it is true, it will never change.
index dbe9d644c7be7fafa50c0c763caa8c2a6d1c4769..3c1e01b70072c7cd48b211dc4e1bbef05b3035ec 100644 (file)
@@ -172,8 +172,8 @@ class FileStore : public JournalingObjectStore {
   ThreadPool op_tp;
   struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {
     FileStore *store;
-    OpWQ(FileStore *fs, time_t ti, ThreadPool *tp)
-      : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ", ti, tp), store(fs) {}
+    OpWQ(FileStore *fs, time_t timeout, ThreadPool *tp)
+      : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ", timeout, 0, tp), store(fs) {}
 
     bool _enqueue(OpSequencer *osr) {
       store->op_queue.push_back(osr);
index 494e638a0415eb75b6ee02224702a17a3433f69f..2068e7c9cfb6ff90df9a0de32113d8d23fee5532 100644 (file)
@@ -318,7 +318,7 @@ private:
   struct OpWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
     OpWQ(OSD *o, time_t ti, ThreadPool *tp)
-      : ThreadPool::WorkQueue<PG>("OSD::OpWQ", ti, tp), osd(o) {}
+      : ThreadPool::WorkQueue<PG>("OSD::OpWQ", ti, 0, tp), osd(o) {}
 
     bool _enqueue(PG *pg) {
       pg->get();
@@ -596,7 +596,7 @@ protected:
   struct BacklogWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
     BacklogWQ(OSD *o, time_t ti, ThreadPool *tp)
-      : ThreadPool::WorkQueue<PG>("OSD::BacklogWQ", ti, tp), osd(o) {}
+      : ThreadPool::WorkQueue<PG>("OSD::BacklogWQ", ti, 0, tp), osd(o) {}
 
     bool _empty() {
       return osd->backlog_queue.empty();
@@ -648,7 +648,7 @@ protected:
   struct RecoveryWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
     RecoveryWQ(OSD *o, time_t ti, ThreadPool *tp)
-      : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", ti, tp), osd(o) {}
+      : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", ti, 0, tp), osd(o) {}
 
     bool _empty() {
       return osd->recovery_queue.empty();
@@ -723,7 +723,7 @@ protected:
   struct SnapTrimWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
     SnapTrimWQ(OSD *o, time_t ti, ThreadPool *tp)
-      : ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", ti, tp), osd(o) {}
+      : ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", ti, 0, tp), osd(o) {}
 
     bool _empty() {
       return osd->snap_trim_queue.empty();
@@ -783,7 +783,7 @@ protected:
   struct ScrubWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
     ScrubWQ(OSD *o, time_t ti, ThreadPool *tp)
-      : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", ti, tp), osd(o) {}
+      : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", ti, 0, tp), osd(o) {}
 
     bool _empty() {
       return osd->scrub_queue.empty();
@@ -827,7 +827,7 @@ protected:
 
   public:
     ScrubFinalizeWQ(OSD *o, time_t ti, ThreadPool *tp)
-      : ThreadPool::WorkQueue<PG>("OSD::ScrubFinalizeWQ", ti, tp), osd(o) {}
+      : ThreadPool::WorkQueue<PG>("OSD::ScrubFinalizeWQ", ti, 0, tp), osd(o) {}
 
     bool _empty() {
       return scrub_finalize_queue.empty();
@@ -872,7 +872,7 @@ protected:
 
   public:
     RepScrubWQ(OSD *o, time_t ti, ThreadPool *tp)
-      : ThreadPool::WorkQueue<MOSDRepScrub>("OSD::RepScrubWQ", ti, tp), osd(o) {}
+      : ThreadPool::WorkQueue<MOSDRepScrub>("OSD::RepScrubWQ", ti, 0, tp), osd(o) {}
 
     bool _empty() {
       return rep_scrub_queue.empty();
@@ -919,7 +919,7 @@ protected:
   struct RemoveWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
     RemoveWQ(OSD *o, time_t ti, ThreadPool *tp)
-      : ThreadPool::WorkQueue<PG>("OSD::RemoveWQ", ti, tp), osd(o) {}
+      : ThreadPool::WorkQueue<PG>("OSD::RemoveWQ", ti, 0, tp), osd(o) {}
 
     bool _empty() {
       return osd->remove_queue.empty();
index c5de7807757e8c3a9031f5c180de1cabf193b78f..d2ab33318f48df227b93f1528a01d7b761495c7f 100644 (file)
@@ -37,7 +37,7 @@ class ExportLocalFileWQ : public RadosSyncWQ {
 public:
   ExportLocalFileWQ(IoCtxDistributor *io_ctx_dist, time_t ti,
                    ThreadPool *tp, ExportDir *export_dir, bool force)
-    : RadosSyncWQ(io_ctx_dist, ti, tp),
+    : RadosSyncWQ(io_ctx_dist, ti, 0, tp),
       m_export_dir(export_dir),
       m_force(force)
   {
@@ -135,7 +135,7 @@ class ExportValidateExistingWQ : public RadosSyncWQ {
 public:
   ExportValidateExistingWQ(IoCtxDistributor *io_ctx_dist, time_t ti,
                           ThreadPool *tp, const char *dir_name)
-    : RadosSyncWQ(io_ctx_dist, ti, tp),
+    : RadosSyncWQ(io_ctx_dist, ti, 0, tp),
       m_dir_name(dir_name)
   {
   }
index 6ca159905e1757463533770d2ac4425ac62d3ebb..49a10245119102e1c31a60a9150f88e2fbefbb78 100644 (file)
@@ -37,7 +37,7 @@ class ImportLocalFileWQ : public RadosSyncWQ {
 public:
   ImportLocalFileWQ(const char *dir_name, bool force,
                    IoCtxDistributor *io_ctx_dist, time_t ti, ThreadPool *tp)
-    : RadosSyncWQ(io_ctx_dist, ti, tp),
+    : RadosSyncWQ(io_ctx_dist, ti, 0, tp),
       m_dir_name(dir_name),
       m_force(force)
   {
@@ -161,7 +161,7 @@ class ImportValidateExistingWQ : public RadosSyncWQ {
 public:
   ImportValidateExistingWQ(ExportDir *export_dir,
                 IoCtxDistributor *io_ctx_dist, time_t ti, ThreadPool *tp)
-    : RadosSyncWQ(io_ctx_dist, ti, tp),
+    : RadosSyncWQ(io_ctx_dist, ti, 0, tp),
       m_export_dir(export_dir)
   {
   }
index 4eb88c5bab7172221f2d7d43d28a0c5675fabbca..1f6b313bac5a349dd7fa346a1051c7798688a505 100644 (file)
@@ -304,8 +304,8 @@ IoCtxDistributor::~IoCtxDistributor() {
   clear();
 }
 
-RadosSyncWQ::RadosSyncWQ(IoCtxDistributor *io_ctx_dist, time_t ti, ThreadPool *tp)
-  : ThreadPool::WorkQueue<std::string>("FileStore::OpWQ", ti, tp),
+RadosSyncWQ::RadosSyncWQ(IoCtxDistributor *io_ctx_dist, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+  : ThreadPool::WorkQueue<std::string>("FileStore::OpWQ", timeout, suicide_timeout, tp),
     m_io_ctx_dist(io_ctx_dist)
 {
 }
index e60dff1edc90c9bde02d58821e444e732f006065..7e53f491c58d4b21b765d96dc80ee9649b3d5618 100644 (file)
@@ -121,7 +121,7 @@ private:
 
 class RadosSyncWQ : public ThreadPool::WorkQueue<std::string> {
 public:
-  RadosSyncWQ(IoCtxDistributor *io_ctx_dist, time_t ti, ThreadPool *tp);
+  RadosSyncWQ(IoCtxDistributor *io_ctx_dist, time_t timeout, time_t suicide_timeout, ThreadPool *tp);
 protected:
   IoCtxDistributor *m_io_ctx_dist;
 private:
index 1e158ea9e0dfddfd03f40b86739aeb534a7f2ee5..3a6b833a524f8926ae8c97e277a4ee2ff9daf71b 100644 (file)
@@ -61,8 +61,8 @@ class RGWProcess {
 
   struct RGWWQ : public ThreadPool::WorkQueue<FCGX_Request> {
     RGWProcess *process;
-    RGWWQ(RGWProcess *p, time_t ti, ThreadPool *tp)
-      : ThreadPool::WorkQueue<FCGX_Request>("RGWWQ", ti, tp), process(p) {}
+    RGWWQ(RGWProcess *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+      : ThreadPool::WorkQueue<FCGX_Request>("RGWWQ", timeout, suicide_timeout, tp), process(p) {}
 
     bool _enqueue(FCGX_Request *req) {
       process->m_fcgx_queue.push_back(req);
@@ -107,7 +107,8 @@ class RGWProcess {
 public:
   RGWProcess(CephContext *cct, int num_threads)
     : m_tp(cct, "RGWProcess::m_tp", num_threads),
-      req_wq(this, g_conf->rgw_op_thread_timeout, &m_tp) {}
+      req_wq(this, g_conf->rgw_op_thread_timeout,
+            g_conf->rgw_op_thread_suicide_timeout, &m_tp) {}
   void run();
   void handle_request(FCGX_Request *fcgx);
 };